- Add `routing_note` column (migration l9g0h1i2j3k4) to persist why a request was routed to a given domain
- Fix substring-match bug in `_route_capability`: use `\b` word-boundary regex so 'postgres' no longer matches inside 'postgresql'
- Include `title` in keyword scoring for better routing accuracy
- Return `routing_note` string from `_route_capability` and store it on the request
- Add `PATCH /capability-requests/{id}` endpoint + `CapabilityRequestPatch` schema to correct mutable metadata (catalog_entry_id, priority, blocking_task_id, fulfilling_workstream_id)
- Add `patch_capability_request` MCP tool wrapping the new endpoint
- Add 105 lines of routing tests (word-boundary, title-match, multi-entry scoring, broadcast fallback)
- Add `tunnels-up`, `tunnels-status`, `tunnels-check` Makefile targets for ops-bridge managed tunnels
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
443 lines
18 KiB
Python
443 lines
18 KiB
Python
"""
|
|
Capability Request system tests: catalog CRUD, request lifecycle, routing,
|
|
auto-notifications, and task unblocking.
|
|
|
|
All tests use a real PostgreSQL test database (no mocking).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _create_domain(client, slug="testdomain", name="Test Domain"):
|
|
r = await client.post("/domains/", json={"slug": slug, "name": name})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_topic(client, domain_slug="testdomain"):
|
|
r = await client.post("/topics/", json={
|
|
"slug": "testtopic", "title": "Test Topic", "domain": domain_slug,
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_workstream(client, topic_id):
|
|
r = await client.post("/workstreams/", json={
|
|
"topic_id": topic_id, "slug": "test-ws", "title": "Test WS",
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_task(client, workstream_id, title="Test task", status="blocked"):
|
|
r = await client.post("/tasks/", json={
|
|
"workstream_id": workstream_id, "title": title,
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
task = r.json()
|
|
if status != "todo":
|
|
patch = {"status": status}
|
|
if status == "blocked":
|
|
patch["blocking_reason"] = "Waiting for capability request"
|
|
r2 = await client.patch(f"/tasks/{task['id']}", json=patch)
|
|
assert r2.status_code == 200, r2.text
|
|
return r2.json()
|
|
return task
|
|
|
|
|
|
async def _setup_two_domains(client):
|
|
"""Create two domains: 'custodian' (requester) and 'railiance' (provider)."""
|
|
req_domain = await _create_domain(client, "custodian", "Custodian")
|
|
ful_domain = await _create_domain(client, "railiance", "Railiance")
|
|
return req_domain, ful_domain
|
|
|
|
|
|
async def _register_catalog(client, domain="railiance", cap_type="infrastructure",
|
|
title="Cluster provisioning", keywords=None):
|
|
r = await client.post("/capability-catalog/", json={
|
|
"domain": domain,
|
|
"capability_type": cap_type,
|
|
"title": title,
|
|
"keywords": keywords or ["cluster", "k8s", "privacy"],
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_request(client, title="Privacy idea instance",
|
|
description="Need a privacy idea instance on the cluster",
|
|
cap_type="infrastructure", agent="net-kingdom-worker",
|
|
domain="custodian", **kwargs):
|
|
r = await client.post("/capability-requests/", json={
|
|
"title": title,
|
|
"description": description,
|
|
"capability_type": cap_type,
|
|
"requesting_agent": agent,
|
|
"requesting_domain": domain,
|
|
**kwargs,
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Catalog tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCapabilityCatalog:
|
|
async def test_register_and_list(self, client):
|
|
await _setup_two_domains(client)
|
|
entry = await _register_catalog(client)
|
|
assert entry["capability_type"] == "infrastructure"
|
|
assert entry["domain_slug"] == "railiance"
|
|
|
|
r = await client.get("/capability-catalog/")
|
|
assert r.status_code == 200
|
|
assert len(r.json()) == 1
|
|
|
|
async def test_duplicate_entry_rejected(self, client):
|
|
await _setup_two_domains(client)
|
|
await _register_catalog(client)
|
|
r = await client.post("/capability-catalog/", json={
|
|
"domain": "railiance",
|
|
"capability_type": "infrastructure",
|
|
"title": "Cluster provisioning",
|
|
"keywords": [],
|
|
})
|
|
assert r.status_code == 409
|
|
|
|
async def test_filter_by_domain_and_type(self, client):
|
|
await _setup_two_domains(client)
|
|
await _register_catalog(client, domain="railiance", cap_type="infrastructure")
|
|
await _register_catalog(client, domain="railiance", cap_type="security",
|
|
title="TLS cert provisioning", keywords=["tls", "cert"])
|
|
await _register_catalog(client, domain="custodian", cap_type="api",
|
|
title="MCP tool registration", keywords=["mcp"])
|
|
|
|
# Filter by domain
|
|
r = await client.get("/capability-catalog/", params={"domain": "railiance"})
|
|
assert len(r.json()) == 2
|
|
|
|
# Filter by type
|
|
r = await client.get("/capability-catalog/", params={"capability_type": "api"})
|
|
assert len(r.json()) == 1
|
|
assert r.json()[0]["domain_slug"] == "custodian"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Request lifecycle tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCapabilityRequestLifecycle:
|
|
async def test_create_auto_routes_single_match(self, client):
|
|
await _setup_two_domains(client)
|
|
await _register_catalog(client)
|
|
req = await _create_request(client)
|
|
|
|
assert req["status"] == "requested"
|
|
assert req["fulfilling_domain_slug"] == "railiance"
|
|
assert req["catalog_entry_id"] is not None
|
|
|
|
async def test_create_broadcasts_when_no_catalog(self, client):
|
|
await _setup_two_domains(client)
|
|
req = await _create_request(client, cap_type="documentation")
|
|
|
|
assert req["status"] == "requested"
|
|
assert req["fulfilling_domain_slug"] is None
|
|
assert req["catalog_entry_id"] is None
|
|
|
|
async def test_create_broadcasts_when_ambiguous(self, client):
|
|
await _setup_two_domains(client)
|
|
# Two entries for same type, equal keyword scores
|
|
await _register_catalog(client, domain="railiance", cap_type="infrastructure",
|
|
title="K8s clusters", keywords=["k8s"])
|
|
await _register_catalog(client, domain="custodian", cap_type="infrastructure",
|
|
title="Local infra", keywords=["local"])
|
|
|
|
req = await _create_request(client, description="Need something generic")
|
|
# Neither keyword matches, should broadcast
|
|
assert req["fulfilling_domain_slug"] is None
|
|
|
|
async def test_valid_transitions(self, client):
|
|
await _setup_two_domains(client)
|
|
req = await _create_request(client)
|
|
|
|
# accepted
|
|
r = await client.post(f"/capability-requests/{req['id']}/accept", json={
|
|
"fulfilling_agent": "railiance-worker",
|
|
})
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "accepted"
|
|
assert r.json()["accepted_at"] is not None
|
|
|
|
# in_progress
|
|
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "in_progress",
|
|
})
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "in_progress"
|
|
|
|
# ready_for_review
|
|
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "ready_for_review", "note": "Privacy instance is up",
|
|
})
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "ready_for_review"
|
|
|
|
# completed
|
|
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "completed", "note": "Verified and running",
|
|
})
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "completed"
|
|
assert r.json()["completed_at"] is not None
|
|
|
|
async def test_invalid_transitions_422(self, client):
|
|
await _setup_two_domains(client)
|
|
req = await _create_request(client)
|
|
|
|
# requested → in_progress (must accept first)
|
|
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "in_progress",
|
|
})
|
|
assert r.status_code == 422
|
|
|
|
# requested → completed (skip steps)
|
|
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "completed",
|
|
})
|
|
assert r.status_code == 422
|
|
|
|
async def test_accept_sets_fulfilling_fields(self, client):
|
|
await _setup_two_domains(client)
|
|
req = await _create_request(client)
|
|
|
|
r = await client.post(f"/capability-requests/{req['id']}/accept", json={
|
|
"fulfilling_agent": "railiance-worker",
|
|
})
|
|
data = r.json()
|
|
assert data["fulfilling_agent"] == "railiance-worker"
|
|
assert data["accepted_at"] is not None
|
|
|
|
async def test_complete_unblocks_blocking_task(self, client):
|
|
await _setup_two_domains(client)
|
|
topic = await _create_topic(client, "custodian")
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"], status="blocked")
|
|
|
|
req = await _create_request(client, blocking_task_id=task["id"])
|
|
|
|
# Walk through lifecycle
|
|
await client.post(f"/capability-requests/{req['id']}/accept", json={
|
|
"fulfilling_agent": "railiance-worker",
|
|
})
|
|
await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "in_progress",
|
|
})
|
|
await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "ready_for_review",
|
|
})
|
|
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "completed",
|
|
})
|
|
assert r.status_code == 200
|
|
|
|
# Verify task was unblocked
|
|
r = await client.get(f"/tasks/{task['id']}")
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "todo"
|
|
|
|
async def test_complete_without_blocking_task(self, client):
|
|
await _setup_two_domains(client)
|
|
req = await _create_request(client)
|
|
|
|
await client.post(f"/capability-requests/{req['id']}/accept", json={
|
|
"fulfilling_agent": "railiance-worker",
|
|
})
|
|
await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "in_progress",
|
|
})
|
|
await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "ready_for_review",
|
|
})
|
|
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "completed",
|
|
})
|
|
# Should succeed without error (no task to unblock)
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "completed"
|
|
|
|
async def test_notification_created_on_each_transition(self, client):
|
|
await _setup_two_domains(client)
|
|
await _register_catalog(client)
|
|
req = await _create_request(client)
|
|
|
|
# Check notification was sent on creation (to railiance since it was auto-routed)
|
|
r = await client.get("/messages/", params={"to_agent": "railiance"})
|
|
msgs = r.json()
|
|
assert any("[capability-request]" in m["subject"] for m in msgs)
|
|
|
|
# Accept
|
|
await client.post(f"/capability-requests/{req['id']}/accept", json={
|
|
"fulfilling_agent": "railiance-worker",
|
|
})
|
|
r = await client.get("/messages/", params={"to_agent": "net-kingdom-worker"})
|
|
msgs = r.json()
|
|
assert any("[capability-accepted]" in m["subject"] for m in msgs)
|
|
|
|
# ready_for_review
|
|
await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "in_progress",
|
|
})
|
|
await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "ready_for_review",
|
|
})
|
|
r = await client.get("/messages/", params={"to_agent": "net-kingdom-worker"})
|
|
msgs = r.json()
|
|
assert any("[capability-ready]" in m["subject"] for m in msgs)
|
|
|
|
async def test_list_filters_by_domain_status_type(self, client):
|
|
await _setup_two_domains(client)
|
|
await _create_request(client, title="Req A")
|
|
await _create_request(client, title="Req B", cap_type="security")
|
|
|
|
# Filter by type
|
|
r = await client.get("/capability-requests/", params={"capability_type": "security"})
|
|
assert len(r.json()) == 1
|
|
assert r.json()[0]["title"] == "Req B"
|
|
|
|
# Filter by status
|
|
r = await client.get("/capability-requests/", params={"status": "requested"})
|
|
assert len(r.json()) == 2
|
|
|
|
# Filter by domain
|
|
r = await client.get("/capability-requests/", params={"domain": "custodian"})
|
|
assert len(r.json()) == 2 # requesting domain is custodian for both
|
|
|
|
async def test_withdrawn_transition(self, client):
|
|
await _setup_two_domains(client)
|
|
req = await _create_request(client)
|
|
|
|
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "withdrawn", "note": "No longer needed",
|
|
})
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "withdrawn"
|
|
|
|
# Terminal — cannot transition further
|
|
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
|
|
"status": "requested",
|
|
})
|
|
assert r.status_code == 422
|
|
|
|
|
|
class TestCapabilityRequestRouting:
|
|
async def test_word_boundary_avoids_substring_false_positive(self, client):
|
|
"""'postgres' keyword must not match inside 'postgresql'."""
|
|
await _setup_two_domains(client)
|
|
# Register two entries: PostgreSQL HA (keywords: postgres, ha) and k3s (keywords: k3s, cluster)
|
|
await _register_catalog(client, domain="railiance", cap_type="infrastructure",
|
|
title="PostgreSQL HA", keywords=["postgresql", "postgres", "ha"])
|
|
await _register_catalog(client, domain="custodian", cap_type="infrastructure",
|
|
title="K3s provisioning", keywords=["k3s", "cluster", "k8s"])
|
|
|
|
# Description mentions k8s and cluster but NOT standalone "postgres" or "ha"
|
|
req = await _create_request(
|
|
client,
|
|
title="K3s cluster access",
|
|
description="Need k8s foundations. cluster must be up. kubeconfig required.",
|
|
cap_type="infrastructure",
|
|
)
|
|
# k3s entry should win (k8s + cluster match), not postgres entry
|
|
assert req["catalog_entry_id"] is not None
|
|
# Routing note should mention k3s
|
|
assert req["routing_note"] is not None
|
|
assert "K3s" in req["routing_note"] or "k3s" in req["routing_note"].lower()
|
|
|
|
async def test_title_included_in_routing(self, client):
|
|
"""Title keywords should contribute to routing score."""
|
|
await _setup_two_domains(client)
|
|
await _register_catalog(client, domain="railiance", cap_type="infrastructure",
|
|
title="K3s cluster", keywords=["k3s", "cluster", "kubernetes"])
|
|
await _register_catalog(client, domain="custodian", cap_type="infrastructure",
|
|
title="Postgres DB", keywords=["postgresql", "postgres", "database"])
|
|
|
|
# Title contains "k3s" but description is generic
|
|
req = await _create_request(
|
|
client,
|
|
title="k3s cluster access needed",
|
|
description="Need access to proceed with deployment.",
|
|
cap_type="infrastructure",
|
|
)
|
|
assert req["routing_note"] is not None
|
|
assert req["catalog_entry_id"] is not None
|
|
# Should match k3s (title has "k3s" and "cluster")
|
|
# Verify via routing note
|
|
assert "K3s" in req["routing_note"] or "k3s" in req["routing_note"].lower()
|
|
|
|
async def test_ambiguous_routing_broadcasts(self, client):
|
|
"""Tied scores should broadcast (no fulfilling domain)."""
|
|
await _setup_two_domains(client)
|
|
await _register_catalog(client, domain="railiance", cap_type="infrastructure",
|
|
title="Entry A", keywords=["cluster"])
|
|
await _register_catalog(client, domain="custodian", cap_type="infrastructure",
|
|
title="Entry B", keywords=["cluster"])
|
|
req = await _create_request(
|
|
client,
|
|
title="cluster needed",
|
|
description="cluster access",
|
|
cap_type="infrastructure",
|
|
)
|
|
assert req["fulfilling_domain_slug"] is None
|
|
assert "ambiguous" in req["routing_note"]
|
|
|
|
async def test_patch_corrects_catalog_entry_and_reroutes(self, client):
|
|
"""PATCH /capability-requests/{id} corrects catalog_entry_id and re-derives domain."""
|
|
req_d, ful_d = await _setup_two_domains(client)
|
|
# Register wrong entry (postgres) and correct entry (k3s)
|
|
wrong = await _register_catalog(client, domain="railiance", cap_type="infrastructure",
|
|
title="PostgreSQL HA", keywords=["postgresql"])
|
|
correct = await _register_catalog(client, domain="custodian", cap_type="infrastructure",
|
|
title="K3s cluster", keywords=["k3s"])
|
|
|
|
# Create request — auto-routes to postgres (only postgres keyword matches "postgresql" in description)
|
|
req = await _create_request(
|
|
client,
|
|
description="Need postgresql and k3s access",
|
|
cap_type="infrastructure",
|
|
)
|
|
|
|
# Patch to correct catalog entry
|
|
r = await client.patch(f"/capability-requests/{req['id']}", json={
|
|
"catalog_entry_id": correct["id"],
|
|
})
|
|
assert r.status_code == 200
|
|
data = r.json()
|
|
assert data["catalog_entry_id"] == correct["id"]
|
|
assert data["fulfilling_domain_slug"] == "custodian"
|
|
assert "hub correction" in data["routing_note"]
|
|
assert "K3s" in data["routing_note"]
|
|
|
|
async def test_patch_unknown_catalog_entry_404(self, client):
|
|
await _setup_two_domains(client)
|
|
req = await _create_request(client, cap_type="security")
|
|
import uuid as _uuid
|
|
r = await client.patch(f"/capability-requests/{req['id']}", json={
|
|
"catalog_entry_id": str(_uuid.uuid4()),
|
|
})
|
|
assert r.status_code == 404
|
|
|
|
async def test_patch_priority(self, client):
|
|
await _setup_two_domains(client)
|
|
req = await _create_request(client, cap_type="security")
|
|
r = await client.patch(f"/capability-requests/{req['id']}", json={"priority": "critical"})
|
|
assert r.status_code == 200
|
|
assert r.json()["priority"] == "critical"
|
|
assert "priority" in r.json()["routing_note"]
|