Files
state-hub/tests/test_services_router.py
tegwick ce82ada0fa STATE-WP-0062 T5: docs, first-party↔repo test, mark workplan finished
- Add /docs/services reference (two-dimension model, persistence, API) and a
  pointer note from /docs/tpsc; add it to the Reference nav.
- Add a test asserting first_party.repo_slug resolves to a managed_repos FK
  (8 services tests green).
- Mark STATE-WP-0062 tasks done / status finished.

Known classes seeded in the live catalog via the API (Gitea, Postgres as
self-hosted/third-party; State Hub as self-hosted/first-party at Level 2).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 21:16:37 +02:00

109 lines
4.4 KiB
Python

"""Tests for the two-dimension service catalog API (STATE-WP-0062)."""
from __future__ import annotations
def _svc(slug, hosting, development, **extra):
body = {
"slug": slug,
"name": slug.replace("-", " ").title(),
"hosting_type": hosting,
"development_type": development,
}
body.update(extra)
return body
async def test_upsert_cloud_third_party_with_extensions(client):
r = await client.post("/services/catalog", json=_svc(
"openai-api", "cloud_hosted", "third_party",
owner_or_provider="OpenAI",
cloud={"gdpr_maturity": "developing", "dpa_available": True},
third_party={"pricing_model": "usage_based", "support_url": "https://help.openai.com"},
))
assert r.status_code == 201, r.text
body = r.json()
assert body["hosting_type"] == "cloud_hosted"
assert body["development_type"] == "third_party"
assert body["cloud"]["gdpr_maturity"] == "developing"
assert body["cloud"]["dpa_available"] is True
assert body["third_party"]["pricing_model"] == "usage_based"
assert body["first_party"] is None
assert body["self_hosted"] is None
async def test_upsert_self_hosted_first_party_with_maturity(client):
r = await client.post("/services/catalog", json=_svc(
"state-hub", "self_hosted", "first_party",
maturity_level=2,
first_party={"owning_domain": "custodian"},
self_hosted={"helix_instance": "coulombcore", "runbook_ref": "make api"},
))
assert r.status_code == 201, r.text
body = r.json()
assert body["maturity_level"] == 2
assert body["first_party"]["owning_domain"] == "custodian"
assert body["self_hosted"]["helix_instance"] == "coulombcore"
assert body["cloud"] is None
async def test_dimension_filters(client):
await client.post("/services/catalog", json=_svc("gitea", "self_hosted", "third_party",
self_hosted={"upstream_oss_project": "Gitea"}))
await client.post("/services/catalog", json=_svc("stripe", "cloud_hosted", "third_party"))
await client.post("/services/catalog", json=_svc("hub", "self_hosted", "first_party", maturity_level=3))
self_hosted = (await client.get("/services/catalog?hosting_type=self_hosted")).json()
assert {s["slug"] for s in self_hosted} == {"gitea", "hub"}
first_party = (await client.get("/services/catalog?development_type=first_party")).json()
assert {s["slug"] for s in first_party} == {"hub"}
level3 = (await client.get("/services/catalog?maturity_level=3")).json()
assert {s["slug"] for s in level3} == {"hub"}
async def test_upsert_is_idempotent_on_slug(client):
await client.post("/services/catalog", json=_svc("svc-x", "cloud_hosted", "third_party",
category="search"))
r = await client.post("/services/catalog", json=_svc("svc-x", "cloud_hosted", "third_party",
category="storage"))
assert r.status_code == 201, r.text
assert r.json()["category"] == "storage"
listed = (await client.get("/services/catalog")).json()
assert sum(1 for s in listed if s["slug"] == "svc-x") == 1
async def test_invalid_dimensions_rejected(client):
r = await client.post("/services/catalog", json=_svc("bad", "on_prem", "third_party"))
assert r.status_code == 422
r = await client.post("/services/catalog", json=_svc("bad2", "cloud_hosted", "nobody"))
assert r.status_code == 422
async def test_first_party_unknown_repo_slug_404(client):
r = await client.post("/services/catalog", json=_svc(
"svc-y", "self_hosted", "first_party",
first_party={"repo_slug": "does-not-exist"},
))
assert r.status_code == 404
async def test_get_unknown_service_404(client):
r = await client.get("/services/nope")
assert r.status_code == 404
async def test_first_party_repo_slug_links_to_repo(client):
await client.post("/domains/", json={"slug": "custodian", "name": "Custodian"})
repo = (await client.post("/repos/", json={
"domain_slug": "custodian", "slug": "state-hub", "name": "State Hub",
})).json()
r = await client.post("/services/catalog", json=_svc(
"state-hub-api", "self_hosted", "first_party",
maturity_level=2,
first_party={"repo_slug": "state-hub", "owning_domain": "custodian"},
))
assert r.status_code == 201, r.text
assert r.json()["first_party"]["repo_id"] == repo["id"]