Files
inter-hub/scripts/ops-hub-bootstrap-smoke.py
tegwick 4381768045
Some checks failed
Build and Deploy / build-push-deploy (push) Has been cancelled
test: add ops hub bootstrap smoke script
2026-05-19 02:49:14 +02:00

270 lines
8.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""Smoke-test the v2 ops-hub bootstrap path.
Required environment:
IHUB_OPERATOR_KEY Existing operator/admin API key.
Optional environment:
IHUB_BASE Inter-Hub base URL. Default: http://127.0.0.1:8000
OPS_HUB_SLUG Hub slug to create or reuse. Default: ops-hub
OPS_HUB_NAME Hub display name. Default: Operations Hub
OPS_HUB_DOMAIN Hub domain. Default: operations
"""
from __future__ import annotations
import json
import os
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
BASE_URL = os.environ.get("IHUB_BASE", "http://127.0.0.1:8000").rstrip("/")
OPERATOR_KEY = os.environ.get("IHUB_OPERATOR_KEY", "")
HUB_SLUG = os.environ.get("OPS_HUB_SLUG", "ops-hub")
HUB_NAME = os.environ.get("OPS_HUB_NAME", "Operations Hub")
HUB_DOMAIN = os.environ.get("OPS_HUB_DOMAIN", "operations")
WIDGET_TYPE = "ops-endpoint-card"
EVENT_TYPE = "ops-endpoint-verified"
ANNOTATION_CATEGORY = "ops-risk"
POLICY_SCOPE = "ops-internal"
WIDGET_NAME = "CoulombCore Gitea Registry"
def main() -> int:
if not OPERATOR_KEY:
print("IHUB_OPERATOR_KEY is required", file=sys.stderr)
return 2
hub = ensure_hub()
manifest = ensure_manifest(hub["id"])
key_response = create_runtime_key(manifest["id"])
runtime_key = key_response["fullKey"]
widget = ensure_widget(runtime_key, hub["id"])
event = submit_event(runtime_key, widget["id"])
verify_event(runtime_key, widget["id"], event["id"])
print(json.dumps(
{
"ok": True,
"hubId": hub["id"],
"manifestId": manifest["id"],
"apiConsumerId": key_response["apiConsumer"]["id"],
"apiKeyPrefix": key_response["apiKey"]["keyPrefix"],
"widgetId": widget["id"],
"eventId": event["id"],
},
indent=2,
sort_keys=True,
))
return 0
def ensure_hub() -> dict[str, Any]:
existing = find_by(list_items("/api/v2/hubs", OPERATOR_KEY), "slug", HUB_SLUG)
if existing:
print(f"reusing hub {HUB_SLUG} ({existing['id']})", file=sys.stderr)
return existing
return request_json(
"POST",
"/api/v2/hubs",
OPERATOR_KEY,
{
"slug": HUB_SLUG,
"name": HUB_NAME,
"domain": HUB_DOMAIN,
"hubKind": "domain",
"hubFamily": "vsm",
"vsmFunction": "operations",
"vsmSystem": "1",
},
expected={201},
)
def ensure_manifest(hub_id: str) -> dict[str, Any]:
manifests = list_items(
"/api/v2/hub-capability-manifests?"
+ urllib.parse.urlencode({"hubId": hub_id}),
OPERATOR_KEY,
)
active = first(lambda item: item.get("status") == "active", manifests)
if active:
print(f"reusing active manifest {active['id']}", file=sys.stderr)
return active
body = {
"manifestVersion": "1.0",
"declaredWidgetTypes": [WIDGET_TYPE],
"declaredEventTypes": [EVENT_TYPE],
"declaredAnnotationCategories": [ANNOTATION_CATEGORY],
"declaredPolicyScopes": [POLICY_SCOPE],
"capabilityDescription": "Operations inventory and endpoint verification",
"contact": "ops@example.com",
}
draft = first(lambda item: item.get("status") == "draft", manifests)
if draft:
manifest = request_json(
"PATCH",
f"/api/v2/hub-capability-manifests/{draft['id']}",
OPERATOR_KEY,
body,
expected={200},
)
else:
manifest = request_json(
"POST",
"/api/v2/hub-capability-manifests",
OPERATOR_KEY,
{"hubId": hub_id, **body},
expected={201},
)
return request_json(
"POST",
f"/api/v2/hub-capability-manifests/{manifest['id']}/activate",
OPERATOR_KEY,
None,
expected={200},
)
def create_runtime_key(manifest_id: str) -> dict[str, Any]:
run_id = int(time.time())
consumer = request_json(
"POST",
"/api/v2/api-consumers",
OPERATOR_KEY,
{
"name": f"{HUB_SLUG}-smoke-{run_id}",
"description": "ops-hub bootstrap smoke test runtime client",
"hubCapabilityManifestId": manifest_id,
"rateLimitPerMinute": 120,
"quotaPerDay": 50000,
},
expected={201},
)
key_response = request_json(
"POST",
f"/api/v2/api-consumers/{consumer['id']}/api-keys",
OPERATOR_KEY,
{"scopes": "ops:write"},
expected={201},
)
if not key_response.get("fullKey"):
raise RuntimeError("api key creation did not return display-once fullKey")
return {"apiConsumer": consumer, **key_response}
def ensure_widget(runtime_key: str, hub_id: str) -> dict[str, Any]:
widgets = list_items("/api/v2/widgets", runtime_key)
existing = first(
lambda item: item.get("hubId") == hub_id and item.get("name") == WIDGET_NAME,
widgets,
)
if existing:
print(f"reusing widget {WIDGET_NAME} ({existing['id']})", file=sys.stderr)
return existing
return request_json(
"POST",
"/api/v2/widgets",
runtime_key,
{
"hubId": hub_id,
"name": WIDGET_NAME,
"widgetType": WIDGET_TYPE,
"viewContext": "operations-inventory",
"policyScope": POLICY_SCOPE,
"status": "active",
},
expected={201},
)
def submit_event(runtime_key: str, widget_id: str) -> dict[str, Any]:
return request_json(
"POST",
"/api/v2/interaction-events",
runtime_key,
{
"widgetId": widget_id,
"eventType": EVENT_TYPE,
"viewContext": "registry-readiness",
"metadata": {
"service": "gitea",
"endpoint": "https://gitea.coulomb.social/v2/",
"result": "auth-challenge-ok",
"smokeRunAt": int(time.time()),
},
},
expected={201},
)
def verify_event(runtime_key: str, widget_id: str, event_id: str) -> None:
query = urllib.parse.urlencode({"widgetId": widget_id, "eventType": EVENT_TYPE})
events = list_items(f"/api/v2/interaction-events?{query}", runtime_key)
if not any(item.get("id") == event_id for item in events):
raise RuntimeError(f"created event {event_id} was not returned by list endpoint")
def list_items(path: str, token: str) -> list[dict[str, Any]]:
response = request_json("GET", path, token, None, expected={200})
data = response.get("data", [])
if not isinstance(data, list):
raise RuntimeError(f"expected paginated data array from {path}")
return data
def request_json(
method: str,
path: str,
token: str,
body: dict[str, Any] | None,
*,
expected: set[int],
) -> dict[str, Any]:
data = json.dumps(body).encode("utf-8") if body is not None else None
request = urllib.request.Request(BASE_URL + path, data=data, method=method)
request.add_header("Authorization", f"Bearer {token}")
request.add_header("Accept", "application/json")
if body is not None:
request.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(request) as response:
status = response.status
payload = response.read().decode("utf-8")
except urllib.error.HTTPError as error:
payload = error.read().decode("utf-8")
raise RuntimeError(f"{method} {path} failed with HTTP {error.code}: {payload}") from error
if status not in expected:
raise RuntimeError(f"{method} {path} returned HTTP {status}, expected {sorted(expected)}: {payload}")
if not payload:
return {}
return json.loads(payload)
def find_by(items: list[dict[str, Any]], key: str, value: Any) -> dict[str, Any] | None:
return first(lambda item: item.get(key) == value, items)
def first(predicate, items):
for item in items:
if predicate(item):
return item
return None
if __name__ == "__main__":
raise SystemExit(main())