Files
state-hub/tests/test_capability_requests.py

443 lines
18 KiB
Python

"""
Capability Request system tests: catalog CRUD, request lifecycle, routing,
auto-notifications, and task unblocking.
All tests use a real PostgreSQL test database (no mocking).
"""
from __future__ import annotations
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _create_domain(client, slug="testdomain", name="Test Domain"):
r = await client.post("/domains/", json={"slug": slug, "name": name})
assert r.status_code == 201, r.text
return r.json()
async def _create_topic(client, domain_slug="testdomain"):
r = await client.post("/topics/", json={
"slug": "testtopic", "title": "Test Topic", "domain": domain_slug,
})
assert r.status_code == 201, r.text
return r.json()
async def _create_workstream(client, topic_id):
r = await client.post("/workstreams/", json={
"topic_id": topic_id, "slug": "test-ws", "title": "Test WS",
})
assert r.status_code == 201, r.text
return r.json()
async def _create_task(client, workstream_id, title="Test task", status="wait"):
r = await client.post("/tasks/", json={
"workstream_id": workstream_id, "title": title,
})
assert r.status_code == 201, r.text
task = r.json()
if status != "todo":
patch = {"status": status}
if status == "wait":
patch["blocking_reason"] = "Waiting for capability request"
r2 = await client.patch(f"/tasks/{task['id']}", json=patch)
assert r2.status_code == 200, r2.text
return r2.json()
return task
async def _setup_two_domains(client):
"""Create two domains: 'custodian' (requester) and 'railiance' (provider)."""
req_domain = await _create_domain(client, "custodian", "Custodian")
ful_domain = await _create_domain(client, "railiance", "Railiance")
return req_domain, ful_domain
async def _register_catalog(client, domain="railiance", cap_type="infrastructure",
title="Cluster provisioning", keywords=None):
r = await client.post("/capability-catalog/", json={
"domain": domain,
"capability_type": cap_type,
"title": title,
"keywords": keywords or ["cluster", "k8s", "privacy"],
})
assert r.status_code == 201, r.text
return r.json()
async def _create_request(client, title="Privacy idea instance",
description="Need a privacy idea instance on the cluster",
cap_type="infrastructure", agent="net-kingdom-worker",
domain="custodian", **kwargs):
r = await client.post("/capability-requests/", json={
"title": title,
"description": description,
"capability_type": cap_type,
"requesting_agent": agent,
"requesting_domain": domain,
**kwargs,
})
assert r.status_code == 201, r.text
return r.json()
# ---------------------------------------------------------------------------
# Catalog tests
# ---------------------------------------------------------------------------
class TestCapabilityCatalog:
async def test_register_and_list(self, client):
await _setup_two_domains(client)
entry = await _register_catalog(client)
assert entry["capability_type"] == "infrastructure"
assert entry["domain_slug"] == "railiance"
r = await client.get("/capability-catalog/")
assert r.status_code == 200
assert len(r.json()) == 1
async def test_duplicate_entry_rejected(self, client):
await _setup_two_domains(client)
await _register_catalog(client)
r = await client.post("/capability-catalog/", json={
"domain": "railiance",
"capability_type": "infrastructure",
"title": "Cluster provisioning",
"keywords": [],
})
assert r.status_code == 409
async def test_filter_by_domain_and_type(self, client):
await _setup_two_domains(client)
await _register_catalog(client, domain="railiance", cap_type="infrastructure")
await _register_catalog(client, domain="railiance", cap_type="security",
title="TLS cert provisioning", keywords=["tls", "cert"])
await _register_catalog(client, domain="custodian", cap_type="api",
title="MCP tool registration", keywords=["mcp"])
# Filter by domain
r = await client.get("/capability-catalog/", params={"domain": "railiance"})
assert len(r.json()) == 2
# Filter by type
r = await client.get("/capability-catalog/", params={"capability_type": "api"})
assert len(r.json()) == 1
assert r.json()[0]["domain_slug"] == "custodian"
# ---------------------------------------------------------------------------
# Request lifecycle tests
# ---------------------------------------------------------------------------
class TestCapabilityRequestLifecycle:
async def test_create_auto_routes_single_match(self, client):
await _setup_two_domains(client)
await _register_catalog(client)
req = await _create_request(client)
assert req["status"] == "requested"
assert req["fulfilling_domain_slug"] == "railiance"
assert req["catalog_entry_id"] is not None
async def test_create_broadcasts_when_no_catalog(self, client):
await _setup_two_domains(client)
req = await _create_request(client, cap_type="documentation")
assert req["status"] == "requested"
assert req["fulfilling_domain_slug"] is None
assert req["catalog_entry_id"] is None
async def test_create_broadcasts_when_ambiguous(self, client):
await _setup_two_domains(client)
# Two entries for same type, equal keyword scores
await _register_catalog(client, domain="railiance", cap_type="infrastructure",
title="K8s clusters", keywords=["k8s"])
await _register_catalog(client, domain="custodian", cap_type="infrastructure",
title="Local infra", keywords=["local"])
req = await _create_request(client, description="Need something generic")
# Neither keyword matches, should broadcast
assert req["fulfilling_domain_slug"] is None
async def test_valid_transitions(self, client):
await _setup_two_domains(client)
req = await _create_request(client)
# accepted
r = await client.post(f"/capability-requests/{req['id']}/accept", json={
"fulfilling_agent": "railiance-worker",
})
assert r.status_code == 200
assert r.json()["status"] == "accepted"
assert r.json()["accepted_at"] is not None
# in_progress
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "in_progress",
})
assert r.status_code == 200
assert r.json()["status"] == "in_progress"
# ready_for_review
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "ready_for_review", "note": "Privacy instance is up",
})
assert r.status_code == 200
assert r.json()["status"] == "ready_for_review"
# completed
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "completed", "note": "Verified and running",
})
assert r.status_code == 200
assert r.json()["status"] == "completed"
assert r.json()["completed_at"] is not None
async def test_invalid_transitions_422(self, client):
await _setup_two_domains(client)
req = await _create_request(client)
# requested → in_progress (must accept first)
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "in_progress",
})
assert r.status_code == 422
# requested → completed (skip steps)
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "completed",
})
assert r.status_code == 422
async def test_accept_sets_fulfilling_fields(self, client):
await _setup_two_domains(client)
req = await _create_request(client)
r = await client.post(f"/capability-requests/{req['id']}/accept", json={
"fulfilling_agent": "railiance-worker",
})
data = r.json()
assert data["fulfilling_agent"] == "railiance-worker"
assert data["accepted_at"] is not None
async def test_complete_unblocks_blocking_task(self, client):
await _setup_two_domains(client)
topic = await _create_topic(client, "custodian")
ws = await _create_workstream(client, topic["id"])
task = await _create_task(client, ws["id"], status="wait")
req = await _create_request(client, blocking_task_id=task["id"])
# Walk through lifecycle
await client.post(f"/capability-requests/{req['id']}/accept", json={
"fulfilling_agent": "railiance-worker",
})
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "in_progress",
})
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "ready_for_review",
})
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "completed",
})
assert r.status_code == 200
# Verify task was unblocked
r = await client.get(f"/tasks/{task['id']}")
assert r.status_code == 200
assert r.json()["status"] == "todo"
async def test_complete_without_blocking_task(self, client):
await _setup_two_domains(client)
req = await _create_request(client)
await client.post(f"/capability-requests/{req['id']}/accept", json={
"fulfilling_agent": "railiance-worker",
})
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "in_progress",
})
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "ready_for_review",
})
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "completed",
})
# Should succeed without error (no task to unblock)
assert r.status_code == 200
assert r.json()["status"] == "completed"
async def test_notification_created_on_each_transition(self, client):
await _setup_two_domains(client)
await _register_catalog(client)
req = await _create_request(client)
# Check notification was sent on creation (to railiance since it was auto-routed)
r = await client.get("/messages/", params={"to_agent": "railiance"})
msgs = r.json()
assert any("[capability-request]" in m["subject"] for m in msgs)
# Accept
await client.post(f"/capability-requests/{req['id']}/accept", json={
"fulfilling_agent": "railiance-worker",
})
r = await client.get("/messages/", params={"to_agent": "net-kingdom-worker"})
msgs = r.json()
assert any("[capability-accepted]" in m["subject"] for m in msgs)
# ready_for_review
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "in_progress",
})
await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "ready_for_review",
})
r = await client.get("/messages/", params={"to_agent": "net-kingdom-worker"})
msgs = r.json()
assert any("[capability-ready]" in m["subject"] for m in msgs)
async def test_list_filters_by_domain_status_type(self, client):
await _setup_two_domains(client)
await _create_request(client, title="Req A")
await _create_request(client, title="Req B", cap_type="security")
# Filter by type
r = await client.get("/capability-requests/", params={"capability_type": "security"})
assert len(r.json()) == 1
assert r.json()[0]["title"] == "Req B"
# Filter by status
r = await client.get("/capability-requests/", params={"status": "requested"})
assert len(r.json()) == 2
# Filter by domain
r = await client.get("/capability-requests/", params={"domain": "custodian"})
assert len(r.json()) == 2 # requesting domain is custodian for both
async def test_withdrawn_transition(self, client):
await _setup_two_domains(client)
req = await _create_request(client)
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "withdrawn", "note": "No longer needed",
})
assert r.status_code == 200
assert r.json()["status"] == "withdrawn"
# Terminal — cannot transition further
r = await client.patch(f"/capability-requests/{req['id']}/status", json={
"status": "requested",
})
assert r.status_code == 422
class TestCapabilityRequestRouting:
async def test_word_boundary_avoids_substring_false_positive(self, client):
"""'postgres' keyword must not match inside 'postgresql'."""
await _setup_two_domains(client)
# Register two entries: PostgreSQL HA (keywords: postgres, ha) and k3s (keywords: k3s, cluster)
await _register_catalog(client, domain="railiance", cap_type="infrastructure",
title="PostgreSQL HA", keywords=["postgresql", "postgres", "ha"])
await _register_catalog(client, domain="custodian", cap_type="infrastructure",
title="K3s provisioning", keywords=["k3s", "cluster", "k8s"])
# Description mentions k8s and cluster but NOT standalone "postgres" or "ha"
req = await _create_request(
client,
title="K3s cluster access",
description="Need k8s foundations. cluster must be up. kubeconfig required.",
cap_type="infrastructure",
)
# k3s entry should win (k8s + cluster match), not postgres entry
assert req["catalog_entry_id"] is not None
# Routing note should mention k3s
assert req["routing_note"] is not None
assert "K3s" in req["routing_note"] or "k3s" in req["routing_note"].lower()
async def test_title_included_in_routing(self, client):
"""Title keywords should contribute to routing score."""
await _setup_two_domains(client)
await _register_catalog(client, domain="railiance", cap_type="infrastructure",
title="K3s cluster", keywords=["k3s", "cluster", "kubernetes"])
await _register_catalog(client, domain="custodian", cap_type="infrastructure",
title="Postgres DB", keywords=["postgresql", "postgres", "database"])
# Title contains "k3s" but description is generic
req = await _create_request(
client,
title="k3s cluster access needed",
description="Need access to proceed with deployment.",
cap_type="infrastructure",
)
assert req["routing_note"] is not None
assert req["catalog_entry_id"] is not None
# Should match k3s (title has "k3s" and "cluster")
# Verify via routing note
assert "K3s" in req["routing_note"] or "k3s" in req["routing_note"].lower()
async def test_ambiguous_routing_broadcasts(self, client):
"""Tied scores should broadcast (no fulfilling domain)."""
await _setup_two_domains(client)
await _register_catalog(client, domain="railiance", cap_type="infrastructure",
title="Entry A", keywords=["cluster"])
await _register_catalog(client, domain="custodian", cap_type="infrastructure",
title="Entry B", keywords=["cluster"])
req = await _create_request(
client,
title="cluster needed",
description="cluster access",
cap_type="infrastructure",
)
assert req["fulfilling_domain_slug"] is None
assert "ambiguous" in req["routing_note"]
async def test_patch_corrects_catalog_entry_and_reroutes(self, client):
"""PATCH /capability-requests/{id} corrects catalog_entry_id and re-derives domain."""
req_d, ful_d = await _setup_two_domains(client)
# Register wrong entry (postgres) and correct entry (k3s)
wrong = await _register_catalog(client, domain="railiance", cap_type="infrastructure",
title="PostgreSQL HA", keywords=["postgresql"])
correct = await _register_catalog(client, domain="custodian", cap_type="infrastructure",
title="K3s cluster", keywords=["k3s"])
# Create request — auto-routes to postgres (only postgres keyword matches "postgresql" in description)
req = await _create_request(
client,
description="Need postgresql and k3s access",
cap_type="infrastructure",
)
# Patch to correct catalog entry
r = await client.patch(f"/capability-requests/{req['id']}", json={
"catalog_entry_id": correct["id"],
})
assert r.status_code == 200
data = r.json()
assert data["catalog_entry_id"] == correct["id"]
assert data["fulfilling_domain_slug"] == "custodian"
assert "hub correction" in data["routing_note"]
assert "K3s" in data["routing_note"]
async def test_patch_unknown_catalog_entry_404(self, client):
await _setup_two_domains(client)
req = await _create_request(client, cap_type="security")
import uuid as _uuid
r = await client.patch(f"/capability-requests/{req['id']}", json={
"catalog_entry_id": str(_uuid.uuid4()),
})
assert r.status_code == 404
async def test_patch_priority(self, client):
await _setup_two_domains(client)
req = await _create_request(client, cap_type="security")
r = await client.patch(f"/capability-requests/{req['id']}", json={"priority": "critical"})
assert r.status_code == 200
assert r.json()["priority"] == "critical"
assert "priority" in r.json()["routing_note"]