#!/usr/bin/env python3 """Smoke-test the v2 ops-hub bootstrap path. Required environment: IHUB_OPERATOR_KEY Existing operator/admin API key. Optional environment: IHUB_BASE Inter-Hub base URL. Default: http://127.0.0.1:8000 OPS_HUB_SLUG Hub slug to create or reuse. Default: ops-hub OPS_HUB_NAME Hub display name. Default: Operations Hub OPS_HUB_DOMAIN Hub domain. Default: operations """ from __future__ import annotations import json import os import sys import time import urllib.error import urllib.parse import urllib.request from typing import Any BASE_URL = os.environ.get("IHUB_BASE", "http://127.0.0.1:8000").rstrip("/") OPERATOR_KEY = os.environ.get("IHUB_OPERATOR_KEY", "") HUB_SLUG = os.environ.get("OPS_HUB_SLUG", "ops-hub") HUB_NAME = os.environ.get("OPS_HUB_NAME", "Operations Hub") HUB_DOMAIN = os.environ.get("OPS_HUB_DOMAIN", "operations") WIDGET_TYPE = "ops-endpoint-card" EVENT_TYPE = "ops-endpoint-verified" ANNOTATION_CATEGORY = "ops-risk" POLICY_SCOPE = "ops-internal" WIDGET_NAME = "CoulombCore Gitea Registry" def main() -> int: if not OPERATOR_KEY: print("IHUB_OPERATOR_KEY is required", file=sys.stderr) return 2 hub = ensure_hub() manifest = ensure_manifest(hub["id"]) key_response = create_runtime_key(manifest["id"]) runtime_key = key_response["fullKey"] widget = ensure_widget(runtime_key, hub["id"]) event = submit_event(runtime_key, widget["id"]) verify_event(runtime_key, widget["id"], event["id"]) print(json.dumps( { "ok": True, "hubId": hub["id"], "manifestId": manifest["id"], "apiConsumerId": key_response["apiConsumer"]["id"], "apiKeyPrefix": key_response["apiKey"]["keyPrefix"], "widgetId": widget["id"], "eventId": event["id"], }, indent=2, sort_keys=True, )) return 0 def ensure_hub() -> dict[str, Any]: existing = find_by(list_items("/api/v2/hubs", None), "slug", HUB_SLUG) if existing: print(f"reusing hub {HUB_SLUG} ({existing['id']})", file=sys.stderr) return existing return request_json( "POST", "/api/v2/hubs", OPERATOR_KEY, { "slug": HUB_SLUG, "name": HUB_NAME, "domain": HUB_DOMAIN, "hubKind": "domain", "hubFamily": "vsm", "vsmFunction": "operations", "vsmSystem": "1", }, expected={201}, ) def ensure_manifest(hub_id: str) -> dict[str, Any]: manifests = list_items( "/api/v2/hub-capability-manifests?" + urllib.parse.urlencode({"hubId": hub_id}), OPERATOR_KEY, ) active = first(lambda item: item.get("status") == "active", manifests) if active: print(f"reusing active manifest {active['id']}", file=sys.stderr) return active body = { "manifestVersion": "1.0", "declaredWidgetTypes": [WIDGET_TYPE], "declaredEventTypes": [EVENT_TYPE], "declaredAnnotationCategories": [ANNOTATION_CATEGORY], "declaredPolicyScopes": [POLICY_SCOPE], "capabilityDescription": "Operations inventory and endpoint verification", "contact": "ops@example.com", } draft = first(lambda item: item.get("status") == "draft", manifests) if draft: manifest = request_json( "PATCH", f"/api/v2/hub-capability-manifests/{draft['id']}", OPERATOR_KEY, body, expected={200}, ) else: manifest = request_json( "POST", "/api/v2/hub-capability-manifests", OPERATOR_KEY, {"hubId": hub_id, **body}, expected={201}, ) return request_json( "POST", f"/api/v2/hub-capability-manifests/{manifest['id']}/activate", OPERATOR_KEY, None, expected={200}, ) def create_runtime_key(manifest_id: str) -> dict[str, Any]: run_id = int(time.time()) consumer = request_json( "POST", "/api/v2/api-consumers", OPERATOR_KEY, { "name": f"{HUB_SLUG}-smoke-{run_id}", "description": "ops-hub bootstrap smoke test runtime client", "hubCapabilityManifestId": manifest_id, "rateLimitPerMinute": 120, "quotaPerDay": 50000, }, expected={201}, ) key_response = request_json( "POST", f"/api/v2/api-consumers/{consumer['id']}/api-keys", OPERATOR_KEY, {"scopes": "ops:write"}, expected={201}, ) if not key_response.get("fullKey"): raise RuntimeError("api key creation did not return display-once fullKey") return {"apiConsumer": consumer, **key_response} def ensure_widget(runtime_key: str, hub_id: str) -> dict[str, Any]: widgets = list_items("/api/v2/widgets", runtime_key) existing = first( lambda item: item.get("hubId") == hub_id and item.get("name") == WIDGET_NAME, widgets, ) if existing: print(f"reusing widget {WIDGET_NAME} ({existing['id']})", file=sys.stderr) return existing return request_json( "POST", "/api/v2/widgets", runtime_key, { "hubId": hub_id, "name": WIDGET_NAME, "widgetType": WIDGET_TYPE, "viewContext": "operations-inventory", "policyScope": POLICY_SCOPE, "status": "active", }, expected={201}, ) def submit_event(runtime_key: str, widget_id: str) -> dict[str, Any]: return request_json( "POST", "/api/v2/interaction-events", runtime_key, { "widgetId": widget_id, "eventType": EVENT_TYPE, "viewContext": "registry-readiness", "metadata": { "service": "gitea", "endpoint": "https://gitea.coulomb.social/v2/", "result": "auth-challenge-ok", "smokeRunAt": int(time.time()), }, }, expected={201}, ) def verify_event(runtime_key: str, widget_id: str, event_id: str) -> None: query = urllib.parse.urlencode({"widgetId": widget_id, "eventType": EVENT_TYPE}) events = list_items(f"/api/v2/interaction-events?{query}", runtime_key) if not any(item.get("id") == event_id for item in events): raise RuntimeError(f"created event {event_id} was not returned by list endpoint") def list_items(path: str, token: str | None) -> list[dict[str, Any]]: response = request_json("GET", path, token, None, expected={200}) data = response.get("data", []) if not isinstance(data, list): raise RuntimeError(f"expected paginated data array from {path}") return data def request_json( method: str, path: str, token: str | None, body: dict[str, Any] | None, *, expected: set[int], ) -> dict[str, Any]: data = json.dumps(body).encode("utf-8") if body is not None else None request = urllib.request.Request(BASE_URL + path, data=data, method=method) if token is not None: request.add_header("Authorization", f"Bearer {token}") request.add_header("Accept", "application/json") if body is not None: request.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(request) as response: status = response.status payload = response.read().decode("utf-8") except urllib.error.HTTPError as error: payload = error.read().decode("utf-8") raise RuntimeError(f"{method} {path} failed with HTTP {error.code}: {payload}") from error if status not in expected: raise RuntimeError(f"{method} {path} returned HTTP {status}, expected {sorted(expected)}: {payload}") if not payload: return {} return json.loads(payload) def find_by(items: list[dict[str, Any]], key: str, value: Any) -> dict[str, Any] | None: return first(lambda item: item.get(key) == value, items) def first(predicate, items): for item in items: if predicate(item): return item return None if __name__ == "__main__": raise SystemExit(main())