Files
ops-warden/tests/test_routing.py
tegwick 1237cc767b Complete WARDEN-WP-0012 routing scenario playbooks
Add platform-secret playbooks for issue-core ingestion, OpenRouter llm-connect,
object-storage STS, and database dynamic credentials. Extend the routing catalog
with draft entries and implement `warden route list --stale` for quarterly drift
review. Document the review cadence in AccessRouting and mark the workplan finished.
2026-06-25 10:27:23 +02:00

291 lines
9.6 KiB
Python

"""Tests for the routing pointer catalog and `warden route` CLI.
No test here requires a live subsystem — routing is a read-only pointer layer.
"""
import json
import re
from pathlib import Path
import pytest
import yaml
from typer.testing import CliRunner
from warden.cli import app
from datetime import date
from warden.routing import CatalogError, load_catalog
from warden.routing.catalog import days_since_review, find_catalog_path, is_review_stale
runner = CliRunner()
def _repo_catalog() -> Path:
return find_catalog_path()
def _write_catalog(tmp_path: Path, entries: list[dict]) -> Path:
path = tmp_path / "catalog.yaml"
path.write_text(yaml.dump({"version": 1, "entries": entries}))
return path
SSH_ENTRY = {
"id": "ssh-cert-host-access",
"title": "SSH cert",
"need_keywords": ["ssh", "cert", "sign"],
"owner_repo": "ops-warden",
"subsystem": "ops-warden",
"warden_executes": True,
"wiki_ref": "wiki/AccessRouting.md#issue-vs-route",
"canon_ref": "net-kingdom/docs/x.md",
"reviewed": "2026-06-18",
"status": "active",
"cert_command": "warden sign <actor> --pubkey <path>",
"steps": ["confirm inventory", "sign"],
}
ROUTED_ENTRY = {
"id": "openbao-api-key",
"title": "API key",
"need_keywords": ["api", "key", "openbao"],
"owner_repo": "railiance-platform",
"subsystem": "OpenBao",
"warden_executes": False,
"wiki_ref": "wiki/CredentialRouting.md#routing-table",
"canon_ref": "net-kingdom/docs/x.md",
"reviewed": "2026-06-18",
"status": "active",
}
# ---------------------------------------------------------------------------
# Catalog load + validation
# ---------------------------------------------------------------------------
def test_real_catalog_loads():
catalog = load_catalog(_repo_catalog())
assert len(catalog.entries) >= 6
ssh = catalog.get("ssh-cert-host-access")
assert ssh is not None and ssh.warden_executes is True
assert ssh.cert_command and "warden sign" in ssh.cert_command
def test_real_catalog_has_one_executed_lane():
catalog = load_catalog(_repo_catalog())
executed = [e for e in catalog.entries if e.warden_executes]
assert [e.id for e in executed] == ["ssh-cert-host-access"]
def test_no_double_source_rule_rejects_routed_steps(tmp_path):
bad = dict(ROUTED_ENTRY)
bad["steps"] = ["do a thing on OpenBao"] # non-SSH entry must not carry steps
path = _write_catalog(tmp_path, [SSH_ENTRY, bad])
with pytest.raises(CatalogError, match="no-double-source"):
load_catalog(path)
def test_routed_cert_command_rejected(tmp_path):
bad = dict(ROUTED_ENTRY)
bad["cert_command"] = "warden secret get"
path = _write_catalog(tmp_path, [bad])
with pytest.raises(CatalogError, match="cert_command"):
load_catalog(path)
def test_duplicate_id_rejected(tmp_path):
path = _write_catalog(tmp_path, [ROUTED_ENTRY, dict(ROUTED_ENTRY)])
with pytest.raises(CatalogError, match="duplicate"):
load_catalog(path)
def test_missing_field_rejected(tmp_path):
bad = {k: v for k, v in ROUTED_ENTRY.items() if k != "owner_repo"}
path = _write_catalog(tmp_path, [bad])
with pytest.raises(CatalogError, match="owner_repo"):
load_catalog(path)
def test_missing_catalog_file():
with pytest.raises(CatalogError):
load_catalog(Path("/nonexistent/catalog.yaml"))
# ---------------------------------------------------------------------------
# find ranking
# ---------------------------------------------------------------------------
def test_find_active_excludes_draft():
catalog = load_catalog(_repo_catalog())
ids = [e.id for e in catalog.find("issue core api key")]
assert "issue-core-ingestion-api-key" not in ids
def test_find_all_includes_draft():
catalog = load_catalog(_repo_catalog())
ids = [e.id for e in catalog.find("issue core api key", include_draft=True)]
assert "issue-core-ingestion-api-key" in ids
def test_find_ssh_tunnel_top_match():
catalog = load_catalog(_repo_catalog())
matches = catalog.find("ssh tunnel")
assert matches and matches[0].id == "ops-bridge-tunnel"
def test_find_openrouter_key():
catalog = load_catalog(_repo_catalog())
matches = catalog.find("openrouter api key", include_draft=True)
assert matches and matches[0].id == "openrouter-llm-connect"
def test_find_object_storage_sts():
catalog = load_catalog(_repo_catalog())
matches = catalog.find("s3 temporary credentials", include_draft=True)
assert matches and matches[0].id == "object-storage-sts"
# ---------------------------------------------------------------------------
# Review staleness
# ---------------------------------------------------------------------------
def test_days_since_review():
assert days_since_review("2026-06-01", today=date(2026, 6, 24)) == 23
def test_is_review_stale_past_threshold():
assert is_review_stale("2026-01-01", threshold_days=90, today=date(2026, 6, 24))
def test_is_review_stale_within_threshold():
assert not is_review_stale("2026-06-01", threshold_days=90, today=date(2026, 6, 24))
def test_catalog_stale_filters_entries():
catalog = load_catalog(_repo_catalog())
stale = catalog.stale(threshold_days=0, today=date(2026, 6, 25))
assert stale
assert all(e.reviewed <= "2026-06-24" for e in stale)
# ---------------------------------------------------------------------------
# CLI (uses the repo catalog via env override)
# ---------------------------------------------------------------------------
@pytest.fixture
def repo_catalog_env(monkeypatch):
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
def test_cli_list_active_only(repo_catalog_env):
result = runner.invoke(app, ["route", "list", "--json"])
assert result.exit_code == 0
ids = [e["id"] for e in json.loads(result.stdout)]
assert "issue-core-ingestion-api-key" not in ids
def test_cli_list_all_includes_draft(repo_catalog_env):
result = runner.invoke(app, ["route", "list", "--all", "--json"])
ids = [e["id"] for e in json.loads(result.stdout)]
assert "issue-core-ingestion-api-key" in ids
def test_cli_show_ssh_json_includes_cert_pattern(repo_catalog_env):
result = runner.invoke(app, ["route", "show", "ssh-cert-host-access", "--json"])
assert result.exit_code == 0
data = json.loads(result.stdout)
assert data["warden_executes"] is True
assert "warden sign" in data["cert_command"]
assert data["steps"]
def test_cli_show_routed_has_next_action_not_steps(repo_catalog_env):
result = runner.invoke(app, ["route", "show", "openbao-api-key", "--json"])
data = json.loads(result.stdout)
assert data["warden_executes"] is False
assert "steps" not in data
assert "next_action" in data
def test_cli_show_unknown_exits_one(repo_catalog_env):
result = runner.invoke(app, ["route", "show", "does-not-exist"])
assert result.exit_code == 1
def test_cli_find_json(repo_catalog_env):
result = runner.invoke(app, ["route", "find", "ssh tunnel", "--json"])
assert result.exit_code == 0
ids = [e["id"] for e in json.loads(result.stdout)]
assert "ops-bridge-tunnel" in ids
def test_cli_list_stale_json(repo_catalog_env):
result = runner.invoke(
app, ["route", "list", "--stale", "--stale-days", "1", "--json"]
)
assert result.exit_code == 0
data = json.loads(result.stdout)
assert data
assert all("days_since_review" in row for row in data)
assert all(row["stale_threshold_days"] == 1 for row in data)
def test_cli_list_stale_empty_with_high_threshold(repo_catalog_env):
result = runner.invoke(
app, ["route", "list", "--stale", "--stale-days", "9999"]
)
assert result.exit_code == 0
assert "No stale" in result.output
def test_cli_find_openrouter_draft_only_with_all(repo_catalog_env):
result = runner.invoke(
app, ["route", "find", "openrouter api key", "--all", "--json"]
)
assert result.exit_code == 0
ids = [e["id"] for e in json.loads(result.stdout)]
assert "openrouter-llm-connect" in ids
# ---------------------------------------------------------------------------
# T5 drift guard — every wiki_ref anchor resolves, every entry has a reviewed date
# ---------------------------------------------------------------------------
def _github_slug(heading: str) -> str:
"""Approximate GitHub's heading-anchor slug algorithm."""
text = heading.strip().lower()
text = re.sub(r"[^\w\s-]", "", text) # drop punctuation (em-dash, parens, etc.)
text = text.replace(" ", "-")
return text
def _heading_anchors(md_path: Path) -> set[str]:
anchors: set[str] = set()
for line in md_path.read_text().splitlines():
m = re.match(r"^#{1,6}\s+(.*)$", line)
if m:
anchors.add(_github_slug(m.group(1)))
return anchors
def test_every_wiki_ref_anchor_resolves():
catalog = load_catalog(_repo_catalog())
repo_root = _repo_catalog().parents[2] # registry/routing/catalog.yaml -> repo root
failures = []
for entry in catalog.entries:
rel, _, anchor = entry.wiki_ref.partition("#")
md_path = repo_root / rel
if not md_path.exists():
failures.append(f"{entry.id}: wiki file missing: {rel}")
continue
if anchor and anchor not in _heading_anchors(md_path):
failures.append(f"{entry.id}: anchor #{anchor} not found in {rel}")
assert not failures, "\n".join(failures)
def test_every_entry_has_reviewed_date():
catalog = load_catalog(_repo_catalog())
for entry in catalog.entries:
assert re.match(r"^\d{4}-\d{2}-\d{2}$", entry.reviewed), (
f"{entry.id}: reviewed must be YYYY-MM-DD, got {entry.reviewed!r}"
)