generated from coulomb/repo-seed
Some checks failed
ci / validate-registry (push) Has been cancelled
Add FederationHubAPI spec, hub registration schema, FastAPI hub with SQLite persistence, reuse-surface hub CLI client, Dockerfile, and hub tests. Activate workplan; T05 deploy and T06 ops docs remain open pending railiance01 cutover.
86 lines
2.6 KiB
Python
86 lines
2.6 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import urllib.error
|
|
import urllib.request
|
|
from typing import Any
|
|
|
|
|
|
def hub_base_url(explicit: str | None = None) -> str:
|
|
base = (explicit or os.environ.get("REUSE_SURFACE_HUB_URL", "")).rstrip("/")
|
|
if not base:
|
|
raise ValueError(
|
|
"hub URL not configured; set REUSE_SURFACE_HUB_URL or pass --hub-url"
|
|
)
|
|
return base
|
|
|
|
|
|
def hub_token() -> str | None:
|
|
return os.environ.get("REUSE_SURFACE_HUB_TOKEN")
|
|
|
|
|
|
def _request(
|
|
method: str,
|
|
url: str,
|
|
*,
|
|
token: str | None = None,
|
|
body: dict[str, Any] | None = None,
|
|
) -> tuple[int, Any]:
|
|
headers = {"Accept": "application/json", "User-Agent": "reuse-surface/0.1"}
|
|
data = None
|
|
if body is not None:
|
|
headers["Content-Type"] = "application/json"
|
|
data = json.dumps(body).encode("utf-8")
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
request = urllib.request.Request(url, data=data, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
raw = response.read().decode("utf-8")
|
|
return response.status, json.loads(raw) if raw else None
|
|
except urllib.error.HTTPError as exc:
|
|
raw = exc.read().decode("utf-8")
|
|
try:
|
|
payload = json.loads(raw) if raw else {"message": exc.reason}
|
|
except json.JSONDecodeError:
|
|
payload = {"message": raw or exc.reason}
|
|
return exc.code, payload
|
|
|
|
|
|
def hub_status(base_url: str | None = None) -> tuple[int, Any]:
|
|
return _request("GET", f"{hub_base_url(base_url)}/health")
|
|
|
|
|
|
def hub_list(base_url: str | None = None) -> tuple[int, Any]:
|
|
return _request("GET", f"{hub_base_url(base_url)}/v1/repos")
|
|
|
|
|
|
def hub_show(repo: str, base_url: str | None = None) -> tuple[int, Any]:
|
|
return _request("GET", f"{hub_base_url(base_url)}/v1/repos/{repo}")
|
|
|
|
|
|
def hub_register(payload: dict[str, Any], base_url: str | None = None) -> tuple[int, Any]:
|
|
token = hub_token()
|
|
if not token:
|
|
raise ValueError("REUSE_SURFACE_HUB_TOKEN is required for register")
|
|
return _request(
|
|
"POST",
|
|
f"{hub_base_url(base_url)}/v1/repos",
|
|
token=token,
|
|
body=payload,
|
|
)
|
|
|
|
|
|
def hub_update(
|
|
repo: str, payload: dict[str, Any], base_url: str | None = None
|
|
) -> tuple[int, Any]:
|
|
token = hub_token()
|
|
if not token:
|
|
raise ValueError("REUSE_SURFACE_HUB_TOKEN is required for update")
|
|
return _request(
|
|
"PATCH",
|
|
f"{hub_base_url(base_url)}/v1/repos/{repo}",
|
|
token=token,
|
|
body=payload,
|
|
) |