generated from coulomb/repo-seed
feat: add interhub bootstrap helper
This commit is contained in:
368
scripts/ops-hub-bootstrap-api.py
Normal file
368
scripts/ops-hub-bootstrap-api.py
Normal file
@@ -0,0 +1,368 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Bootstrap ops-hub in Inter-Hub using the prepared ops-hub seeds.
|
||||
|
||||
The script never prints full API keys. The operator key is read from
|
||||
IHUB_OPERATOR_KEY or IHUB_OPERATOR_KEY_FILE. If an ops-hub runtime key is
|
||||
created, the full key is written to a 0600 temp file and only the file path and
|
||||
key prefix are printed.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import stat
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
DEFAULT_BASE = "https://hub.coulomb.social"
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
MANIFEST_PATH = ROOT / "seeds" / "ops-hub-manifest.draft.json"
|
||||
WIDGETS_PATH = ROOT / "seeds" / "ops-hub-widgets.seed.json"
|
||||
|
||||
|
||||
class BootstrapError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def load_secret(name: str, file_name: str) -> str:
|
||||
value = os.environ.get(name, "").strip()
|
||||
if value:
|
||||
return value
|
||||
file_path = os.environ.get(file_name, "").strip()
|
||||
if file_path:
|
||||
return Path(file_path).read_text(encoding="utf-8").strip()
|
||||
return ""
|
||||
|
||||
|
||||
def request_json(
|
||||
base_url: str,
|
||||
method: str,
|
||||
path: str,
|
||||
token: str | None,
|
||||
body: dict[str, Any] | None,
|
||||
*,
|
||||
expected: set[int],
|
||||
) -> dict[str, Any]:
|
||||
data = json.dumps(body).encode("utf-8") if body is not None else None
|
||||
request = urllib.request.Request(base_url + path, data=data, method=method)
|
||||
request.add_header("Accept", "application/json")
|
||||
request.add_header("User-Agent", "ops-hub-bootstrap/0.1")
|
||||
if token:
|
||||
request.add_header("Authorization", f"Bearer {token}")
|
||||
if body is not None:
|
||||
request.add_header("Content-Type", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(request, timeout=30) as response:
|
||||
status = response.status
|
||||
payload = response.read().decode("utf-8")
|
||||
except urllib.error.HTTPError as error:
|
||||
payload = error.read().decode("utf-8", errors="replace")
|
||||
raise BootstrapError(f"{method} {path} failed with HTTP {error.code}: {payload}") from error
|
||||
|
||||
if status not in expected:
|
||||
raise BootstrapError(f"{method} {path} returned HTTP {status}, expected {sorted(expected)}")
|
||||
if not payload:
|
||||
return {}
|
||||
return json.loads(payload)
|
||||
|
||||
|
||||
def list_items(base_url: str, path: str, token: str | None) -> list[dict[str, Any]]:
|
||||
response = request_json(base_url, "GET", path, token, None, expected={200})
|
||||
data = response.get("data", [])
|
||||
if not isinstance(data, list):
|
||||
raise BootstrapError(f"expected paginated data array from {path}")
|
||||
return data
|
||||
|
||||
|
||||
def first_by(items: list[dict[str, Any]], key: str, value: Any) -> dict[str, Any] | None:
|
||||
return next((item for item in items if item.get(key) == value), None)
|
||||
|
||||
|
||||
def load_manifest() -> dict[str, Any]:
|
||||
manifest = json.loads(MANIFEST_PATH.read_text(encoding="utf-8"))
|
||||
required = [
|
||||
"hub",
|
||||
"manifestVersion",
|
||||
"declaredWidgetTypes",
|
||||
"declaredEventTypes",
|
||||
"declaredAnnotationCategories",
|
||||
"declaredPolicyScopes",
|
||||
]
|
||||
missing = [key for key in required if key not in manifest]
|
||||
if missing:
|
||||
raise BootstrapError(f"{MANIFEST_PATH} missing required key(s): {', '.join(missing)}")
|
||||
return manifest
|
||||
|
||||
|
||||
def load_widgets() -> list[dict[str, Any]]:
|
||||
widgets = json.loads(WIDGETS_PATH.read_text(encoding="utf-8"))
|
||||
if not isinstance(widgets, list):
|
||||
raise BootstrapError(f"{WIDGETS_PATH} must contain a JSON array")
|
||||
return widgets
|
||||
|
||||
|
||||
def ensure_hub(base_url: str, operator_key: str, manifest: dict[str, Any]) -> dict[str, Any]:
|
||||
hub_seed = manifest["hub"]
|
||||
slug = hub_seed["slug"]
|
||||
existing = first_by(list_items(base_url, "/api/v2/hubs", operator_key), "slug", slug)
|
||||
if existing:
|
||||
return {"record": existing, "created": False}
|
||||
|
||||
body = {
|
||||
"slug": slug,
|
||||
"name": hub_seed["name"],
|
||||
"domain": hub_seed["domain"],
|
||||
"hubKind": hub_seed.get("hubKind", "domain"),
|
||||
"hubFamily": "vsm",
|
||||
"vsmFunction": "OPS",
|
||||
"vsmSystem": "1",
|
||||
}
|
||||
record = request_json(base_url, "POST", "/api/v2/hubs", operator_key, body, expected={201})
|
||||
return {"record": record, "created": True}
|
||||
|
||||
|
||||
def manifest_body(manifest: dict[str, Any], hub_id: str | None = None) -> dict[str, Any]:
|
||||
body: dict[str, Any] = {
|
||||
"manifestVersion": manifest["manifestVersion"],
|
||||
"declaredWidgetTypes": manifest["declaredWidgetTypes"],
|
||||
"declaredEventTypes": manifest["declaredEventTypes"],
|
||||
"declaredAnnotationCategories": manifest["declaredAnnotationCategories"],
|
||||
"declaredPolicyScopes": manifest["declaredPolicyScopes"],
|
||||
"capabilityDescription": manifest.get("capabilityDescription", ""),
|
||||
"contact": manifest.get("contact", "operator"),
|
||||
}
|
||||
if hub_id:
|
||||
body["hubId"] = hub_id
|
||||
return body
|
||||
|
||||
|
||||
def ensure_manifest(base_url: str, operator_key: str, hub_id: str, manifest: dict[str, Any]) -> dict[str, Any]:
|
||||
query = urllib.parse.urlencode({"hubId": hub_id})
|
||||
manifests = list_items(base_url, f"/api/v2/hub-capability-manifests?{query}", operator_key)
|
||||
active = first_by(manifests, "status", "active")
|
||||
if active:
|
||||
return {"record": active, "created": False, "activated": False}
|
||||
|
||||
draft = first_by(manifests, "status", "draft")
|
||||
if draft:
|
||||
record = request_json(
|
||||
base_url,
|
||||
"PATCH",
|
||||
f"/api/v2/hub-capability-manifests/{draft['id']}",
|
||||
operator_key,
|
||||
manifest_body(manifest),
|
||||
expected={200},
|
||||
)
|
||||
created = False
|
||||
else:
|
||||
record = request_json(
|
||||
base_url,
|
||||
"POST",
|
||||
"/api/v2/hub-capability-manifests",
|
||||
operator_key,
|
||||
manifest_body(manifest, hub_id),
|
||||
expected={201},
|
||||
)
|
||||
created = True
|
||||
|
||||
activated = request_json(
|
||||
base_url,
|
||||
"POST",
|
||||
f"/api/v2/hub-capability-manifests/{record['id']}/activate",
|
||||
operator_key,
|
||||
None,
|
||||
expected={200},
|
||||
)
|
||||
return {"record": activated, "created": created, "activated": True}
|
||||
|
||||
|
||||
def ensure_api_consumer(base_url: str, operator_key: str, manifest_id: str) -> dict[str, Any]:
|
||||
consumers = list_items(base_url, "/api/v2/api-consumers", operator_key)
|
||||
existing = first_by(consumers, "name", "ops-hub")
|
||||
if existing:
|
||||
return {"record": existing, "created": False}
|
||||
|
||||
record = request_json(
|
||||
base_url,
|
||||
"POST",
|
||||
"/api/v2/api-consumers",
|
||||
operator_key,
|
||||
{
|
||||
"name": "ops-hub",
|
||||
"description": "API consumer for the VSM Operations hub",
|
||||
"hubCapabilityManifestId": manifest_id,
|
||||
"rateLimitPerMinute": 120,
|
||||
"quotaPerDay": 50000,
|
||||
},
|
||||
expected={201},
|
||||
)
|
||||
return {"record": record, "created": True}
|
||||
|
||||
|
||||
def write_runtime_key(full_key: str, output_path: str | None) -> Path:
|
||||
if output_path:
|
||||
path = Path(output_path)
|
||||
path.write_text(full_key, encoding="utf-8")
|
||||
else:
|
||||
fd, raw_path = tempfile.mkstemp(prefix="ops-hub-runtime-key-", text=True)
|
||||
path = Path(raw_path)
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as handle:
|
||||
handle.write(full_key)
|
||||
path.chmod(stat.S_IRUSR | stat.S_IWUSR)
|
||||
return path
|
||||
|
||||
|
||||
def ensure_runtime_key(
|
||||
base_url: str,
|
||||
operator_key: str,
|
||||
api_consumer_id: str,
|
||||
output_path: str | None,
|
||||
) -> dict[str, Any]:
|
||||
existing_runtime_key = load_secret("OPS_HUB_KEY", "OPS_HUB_KEY_FILE")
|
||||
if existing_runtime_key:
|
||||
return {
|
||||
"created": False,
|
||||
"token": existing_runtime_key,
|
||||
"keyPrefix": existing_runtime_key[:8],
|
||||
"keyFile": os.environ.get("OPS_HUB_KEY_FILE"),
|
||||
}
|
||||
|
||||
response = request_json(
|
||||
base_url,
|
||||
"POST",
|
||||
f"/api/v2/api-consumers/{api_consumer_id}/api-keys",
|
||||
operator_key,
|
||||
{"scopes": "framework:read hub:ops-hub:read hub:ops-hub:write"},
|
||||
expected={201},
|
||||
)
|
||||
full_key = response.get("fullKey")
|
||||
if not full_key:
|
||||
raise BootstrapError("api key creation did not return display-once fullKey")
|
||||
key_file = write_runtime_key(full_key, output_path)
|
||||
api_key = response.get("apiKey") or {}
|
||||
return {
|
||||
"created": True,
|
||||
"token": full_key,
|
||||
"keyPrefix": api_key.get("keyPrefix", full_key[:8]),
|
||||
"keyFile": str(key_file),
|
||||
}
|
||||
|
||||
|
||||
def ensure_widgets(base_url: str, runtime_key: str, hub_id: str, widget_seeds: list[dict[str, Any]]) -> dict[str, Any]:
|
||||
existing_widgets = list_items(base_url, "/api/v2/widgets", runtime_key)
|
||||
existing_by_ref = {
|
||||
widget.get("capabilityRef"): widget
|
||||
for widget in existing_widgets
|
||||
if widget.get("hubId") == hub_id and widget.get("capabilityRef")
|
||||
}
|
||||
created: list[dict[str, Any]] = []
|
||||
reused: list[dict[str, Any]] = []
|
||||
for seed in widget_seeds:
|
||||
existing = existing_by_ref.get(seed.get("capabilityRef"))
|
||||
if existing:
|
||||
reused.append(existing)
|
||||
continue
|
||||
body = {"hubId": hub_id, "status": "active", **seed}
|
||||
created.append(request_json(base_url, "POST", "/api/v2/widgets", runtime_key, body, expected={201}))
|
||||
return {"created": created, "reused": reused}
|
||||
|
||||
|
||||
def submit_gitea_event(base_url: str, runtime_key: str, widgets: dict[str, Any]) -> dict[str, Any] | None:
|
||||
all_widgets = widgets["created"] + widgets["reused"]
|
||||
readiness = next(
|
||||
(widget for widget in all_widgets if widget.get("capabilityRef") == "ops:readiness:gitea-registry"),
|
||||
None,
|
||||
)
|
||||
if not readiness:
|
||||
return None
|
||||
return request_json(
|
||||
base_url,
|
||||
"POST",
|
||||
"/api/v2/interaction-events",
|
||||
runtime_key,
|
||||
{
|
||||
"widgetId": readiness["id"],
|
||||
"eventType": "ops-endpoint-verified",
|
||||
"viewContext": "railiance-apps/workplans/RAIL-AP-WP-0001",
|
||||
"metadata": {
|
||||
"vsmFunction": "OPS",
|
||||
"vsmSystem": "S1",
|
||||
"endpoint": "https://gitea.coulomb.social/v2/",
|
||||
"expectedStatus": 401,
|
||||
"observedHeader": "Docker-Distribution-Api-Version: registry/2.0",
|
||||
"recordedBy": "ops-hub/scripts/ops-hub-bootstrap-api.py",
|
||||
"recordedAt": int(time.time()),
|
||||
},
|
||||
},
|
||||
expected={201},
|
||||
)
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--base", default=os.environ.get("IHUB_BASE", DEFAULT_BASE).rstrip("/"))
|
||||
parser.add_argument("--runtime-key-output", default=os.environ.get("OPS_HUB_RUNTIME_KEY_OUTPUT"))
|
||||
parser.add_argument("--skip-event", action="store_true", help="Do not submit the initial Gitea readiness event")
|
||||
return parser
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = build_parser().parse_args()
|
||||
operator_key = load_secret("IHUB_OPERATOR_KEY", "IHUB_OPERATOR_KEY_FILE")
|
||||
if not operator_key:
|
||||
print("ERROR: set IHUB_OPERATOR_KEY or IHUB_OPERATOR_KEY_FILE", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
manifest = load_manifest()
|
||||
widget_seeds = load_widgets()
|
||||
|
||||
hub = ensure_hub(args.base, operator_key, manifest)
|
||||
hub_record = hub["record"]
|
||||
manifest_result = ensure_manifest(args.base, operator_key, hub_record["id"], manifest)
|
||||
manifest_record = manifest_result["record"]
|
||||
consumer = ensure_api_consumer(args.base, operator_key, manifest_record["id"])
|
||||
runtime_key = ensure_runtime_key(args.base, operator_key, consumer["record"]["id"], args.runtime_key_output)
|
||||
widgets = ensure_widgets(args.base, runtime_key["token"], hub_record["id"], widget_seeds)
|
||||
event = None if args.skip_event else submit_gitea_event(args.base, runtime_key["token"], widgets)
|
||||
|
||||
summary = {
|
||||
"ok": True,
|
||||
"base": args.base,
|
||||
"hub": {"id": hub_record["id"], "slug": hub_record["slug"], "created": hub["created"]},
|
||||
"manifest": {
|
||||
"id": manifest_record["id"],
|
||||
"status": manifest_record["status"],
|
||||
"created": manifest_result["created"],
|
||||
"activated": manifest_result["activated"],
|
||||
},
|
||||
"apiConsumer": {"id": consumer["record"]["id"], "name": consumer["record"]["name"], "created": consumer["created"]},
|
||||
"runtimeKey": {
|
||||
"created": runtime_key["created"],
|
||||
"keyPrefix": runtime_key["keyPrefix"],
|
||||
"keyFile": runtime_key["keyFile"],
|
||||
"storeImmediatelyInOpenBao": "platform/operators/ops-hub/runtime",
|
||||
"field": "OPS_HUB_KEY",
|
||||
},
|
||||
"widgets": {"created": len(widgets["created"]), "reused": len(widgets["reused"])},
|
||||
"event": None if event is None else {"id": event["id"], "eventType": event["eventType"], "widgetId": event["widgetId"]},
|
||||
}
|
||||
print(json.dumps(summary, indent=2, sort_keys=True))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
raise SystemExit(main())
|
||||
except BootstrapError as exc:
|
||||
print(f"ERROR: {exc}", file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
Reference in New Issue
Block a user