Files
the-custodian/state-hub/tests/test_capability_requests.py
tegwick f85c5e4d49 feat(capability-requests): add cross-domain capability catalog and request routing
Introduces a capability catalog (CUST-WP-0022) so domains can advertise what
they provide and agents can request capabilities from other domains with
auto-routing, lifecycle tracking, and task-unblocking on completion.

- New models: CapabilityCatalog, CapabilityRequest with full lifecycle
  (requested → accepted → in_progress → ready_for_review → completed/rejected/withdrawn)
- Migration i6d7e8f9a0b1: capability_catalog + capability_requests tables
- Router /capability-catalog and /capability-requests with accept/status endpoints
- 7 new MCP tools: register_capability, list_capabilities, request_capability,
  accept_capability_request, update_capability_request_status,
  list_capability_requests, get_capability_request
- StateSummary gains open_capability_requests count
- Dashboard: capability-requests.md page + docs/capabilities.md + docs/scope.md
- SCOPE.md: three seed capabilities documented (MCP registration, state tracking, SBOM)
- scope.template: Provided Capabilities section with example block
- scripts/ingest_capabilities.py + make ingest-capabilities[/-all] targets

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 21:07:50 +01:00

338 lines
13 KiB
Python

"""
Capability Request system tests: catalog CRUD, request lifecycle, routing,
auto-notifications, and task unblocking.
All tests use a real PostgreSQL test database (no mocking).
"""
from __future__ import annotations
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _create_domain(client, slug="testdomain", name="Test Domain"):
r = await client.post("/domains/", json={"slug": slug, "name": name})
assert r.status_code == 201, r.text
return r.json()
async def _create_topic(client, domain_slug="testdomain"):
r = await client.post("/topics/", json={
"slug": "testtopic", "title": "Test Topic", "domain": domain_slug,
})
assert r.status_code == 201, r.text
return r.json()
async def _create_workstream(client, topic_id):
r = await client.post("/workstreams/", json={
"topic_id": topic_id, "slug": "test-ws", "title": "Test WS",
})
assert r.status_code == 201, r.text
return r.json()
async def _create_task(client, workstream_id, title="Test task", status="blocked"):
r = await client.post("/tasks/", json={
"workstream_id": workstream_id, "title": title,
})
assert r.status_code == 201, r.text
task = r.json()
if status != "todo":
patch = {"status": status}
if status == "blocked":
patch["blocking_reason"] = "Waiting for capability request"
r2 = await client.patch(f"/tasks/{task['id']}", json=patch)
assert r2.status_code == 200, r2.text
return r2.json()
return task
async def _setup_two_domains(client):
"""Create two domains: 'custodian' (requester) and 'railiance' (provider)."""
req_domain = await _create_domain(client, "custodian", "Custodian")
ful_domain = await _create_domain(client, "railiance", "Railiance")
return req_domain, ful_domain
async def _register_catalog(client, domain="railiance", cap_type="infrastructure",
title="Cluster provisioning", keywords=None):
r = await client.post("/capability-catalog/", json={
"domain": domain,
"capability_type": cap_type,
"title": title,
"keywords": keywords or ["cluster", "k8s", "privacy"],
})
assert r.status_code == 201, r.text
return r.json()
async def _create_request(client, title="Privacy idea instance",
description="Need a privacy idea instance on the cluster",
cap_type="infrastructure", agent="net-kingdom-worker",
domain="custodian", **kwargs):
r = await client.post("/capability-requests/", json={
"title": title,
"description": description,
"capability_type": cap_type,
"requesting_agent": agent,
"requesting_domain": domain,
**kwargs,
})
assert r.status_code == 201, r.text
return r.json()
# ---------------------------------------------------------------------------
# Catalog tests
# ---------------------------------------------------------------------------
class TestCapabilityCatalog:
async def test_register_and_list(self, client):
await _setup_two_domains(client)
entry = await _register_catalog(client)
assert entry["capability_type"] == "infrastructure"
assert entry["domain_slug"] == "railiance"
r = await client.get("/capability-catalog/")
assert r.status_code == 200
assert len(r.json()) == 1
async def test_duplicate_entry_rejected(self, client):
await _setup_two_domains(client)
await _register_catalog(client)
r = await client.post("/capability-catalog/", json={
"domain": "railiance",
"capability_type": "infrastructure",
"title": "Cluster provisioning",
"keywords": [],
})
assert r.status_code == 409
async def test_filter_by_domain_and_type(self, client):
await _setup_two_domains(client)
await _register_catalog(client, domain="railiance", cap_type="infrastructure")
await _register_catalog(client, domain="railiance", cap_type="security",
title="TLS cert provisioning", keywords=["tls", "cert"])
await _register_catalog(client, domain="custodian", cap_type="api",
title="MCP tool registration", keywords=["mcp"])
# Filter by domain
r = await client.get("/capability-catalog/", params={"domain": "railiance"})
assert len(r.json()) == 2
# Filter by type
r = await client.get("/capability-catalog/", params={"capability_type": "api"})
assert len(r.json()) == 1
assert r.json()[0]["domain_slug"] == "custodian"
# ---------------------------------------------------------------------------
# Request lifecycle tests
# ---------------------------------------------------------------------------
class TestCapabilityRequestLifecycle:
async def test_create_auto_routes_single_match(self, client):
await _setup_two_domains(client)
await _register_catalog(client)
req = await _create_request(client)
assert req["status"] == "requested"
assert req["fulfilling_domain_slug"] == "railiance"
assert req["catalog_entry_id"] is not None
async def test_create_broadcasts_when_no_catalog(self, client):
await _setup_two_domains(client)
req = await _create_request(client, cap_type="documentation")
assert req["status"] == "requested"
assert req["fulfilling_domain_slug"] is None
assert req["catalog_entry_id"] is None
async def test_create_broadcasts_when_ambiguous(self, client):
await _setup_two_domains(client)
# Two entries for same type, equal keyword scores
await _register_catalog(client, domain="railiance", cap_type="infrastructure",
title="K8s clusters", keywords=["k8s"])
await _register_catalog(client, domain="custodian", cap_type="infrastructure",
title="Local infra", keywords=["local"])
req = await _create_request(client, description="Need something generic")
# Neither keyword matches, should broadcast
assert req["fulfilling_domain_slug"] is None
async def test_valid_transitions(self, client):
await _setup_two_domains(client)
req = await _create_request(client)
# accepted
r = await client.post(f"/capability-requests/{req['id']}/accept", json={
"fulfilling_agent": "railiance-worker",
})
assert r.status_code == 200
assert r.json()["status"] == "accepted"
assert r.json()["accepted_at"] is not None
# in_progress
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "in_progress",
})
assert r.status_code == 200
assert r.json()["status"] == "in_progress"
# ready_for_review
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "ready_for_review", "note": "Privacy instance is up",
})
assert r.status_code == 200
assert r.json()["status"] == "ready_for_review"
# completed
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "completed", "note": "Verified and running",
})
assert r.status_code == 200
assert r.json()["status"] == "completed"
assert r.json()["completed_at"] is not None
async def test_invalid_transitions_422(self, client):
await _setup_two_domains(client)
req = await _create_request(client)
# requested → in_progress (must accept first)
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "in_progress",
})
assert r.status_code == 422
# requested → completed (skip steps)
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "completed",
})
assert r.status_code == 422
async def test_accept_sets_fulfilling_fields(self, client):
await _setup_two_domains(client)
req = await _create_request(client)
r = await client.post(f"/capability-requests/{req['id']}/accept", json={
"fulfilling_agent": "railiance-worker",
})
data = r.json()
assert data["fulfilling_agent"] == "railiance-worker"
assert data["accepted_at"] is not None
async def test_complete_unblocks_blocking_task(self, client):
await _setup_two_domains(client)
topic = await _create_topic(client, "custodian")
ws = await _create_workstream(client, topic["id"])
task = await _create_task(client, ws["id"], status="blocked")
req = await _create_request(client, blocking_task_id=task["id"])
# Walk through lifecycle
await client.post(f"/capability-requests/{req['id']}/accept", json={
"fulfilling_agent": "railiance-worker",
})
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "in_progress",
})
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "ready_for_review",
})
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "completed",
})
assert r.status_code == 200
# Verify task was unblocked
r = await client.get(f"/tasks/{task['id']}")
assert r.status_code == 200
assert r.json()["status"] == "todo"
async def test_complete_without_blocking_task(self, client):
await _setup_two_domains(client)
req = await _create_request(client)
await client.post(f"/capability-requests/{req['id']}/accept", json={
"fulfilling_agent": "railiance-worker",
})
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "in_progress",
})
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "ready_for_review",
})
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "completed",
})
# Should succeed without error (no task to unblock)
assert r.status_code == 200
assert r.json()["status"] == "completed"
async def test_notification_created_on_each_transition(self, client):
await _setup_two_domains(client)
await _register_catalog(client)
req = await _create_request(client)
# Check notification was sent on creation (to railiance since it was auto-routed)
r = await client.get("/messages/", params={"to_agent": "railiance"})
msgs = r.json()
assert any("[capability-request]" in m["subject"] for m in msgs)
# Accept
await client.post(f"/capability-requests/{req['id']}/accept", json={
"fulfilling_agent": "railiance-worker",
})
r = await client.get("/messages/", params={"to_agent": "net-kingdom-worker"})
msgs = r.json()
assert any("[capability-accepted]" in m["subject"] for m in msgs)
# ready_for_review
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "in_progress",
})
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "ready_for_review",
})
r = await client.get("/messages/", params={"to_agent": "net-kingdom-worker"})
msgs = r.json()
assert any("[capability-ready]" in m["subject"] for m in msgs)
async def test_list_filters_by_domain_status_type(self, client):
await _setup_two_domains(client)
await _create_request(client, title="Req A")
await _create_request(client, title="Req B", cap_type="security")
# Filter by type
r = await client.get("/capability-requests/", params={"capability_type": "security"})
assert len(r.json()) == 1
assert r.json()[0]["title"] == "Req B"
# Filter by status
r = await client.get("/capability-requests/", params={"status": "requested"})
assert len(r.json()) == 2
# Filter by domain
r = await client.get("/capability-requests/", params={"domain": "custodian"})
assert len(r.json()) == 2 # requesting domain is custodian for both
async def test_withdrawn_transition(self, client):
await _setup_two_domains(client)
req = await _create_request(client)
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "withdrawn", "note": "No longer needed",
})
assert r.status_code == 200
assert r.json()["status"] == "withdrawn"
# Terminal — cannot transition further
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "requested",
})
assert r.status_code == 422