Files
reuse-surface/reuse_surface/hub_client.py
tegwick cbcd097214
Some checks failed
ci / validate-registry (push) Has been cancelled
Align naming with coulomb.social reuse-surface conventions
Use reuse.coulomb.social, REUSE_SURFACE_URL/TOKEN env vars, reuse-surface
image and reuse-surface-env secret. Replace reuse-surface-hub entrypoint with
reuse-surface serve; CLI uses --base-url.
2026-06-15 09:02:02 +02:00

86 lines
2.6 KiB
Python

from __future__ import annotations
import json
import os
import urllib.error
import urllib.request
from typing import Any
def service_base_url(explicit: str | None = None) -> str:
base = (explicit or os.environ.get("REUSE_SURFACE_URL", "")).rstrip("/")
if not base:
raise ValueError(
"service URL not configured; set REUSE_SURFACE_URL or pass --base-url"
)
return base
def service_token() -> str | None:
return os.environ.get("REUSE_SURFACE_TOKEN")
def _request(
method: str,
url: str,
*,
token: str | None = None,
body: dict[str, Any] | None = None,
) -> tuple[int, Any]:
headers = {"Accept": "application/json", "User-Agent": "reuse-surface/0.1"}
data = None
if body is not None:
headers["Content-Type"] = "application/json"
data = json.dumps(body).encode("utf-8")
if token:
headers["Authorization"] = f"Bearer {token}"
request = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(request, timeout=30) as response:
raw = response.read().decode("utf-8")
return response.status, json.loads(raw) if raw else None
except urllib.error.HTTPError as exc:
raw = exc.read().decode("utf-8")
try:
payload = json.loads(raw) if raw else {"message": exc.reason}
except json.JSONDecodeError:
payload = {"message": raw or exc.reason}
return exc.code, payload
def hub_status(base_url: str | None = None) -> tuple[int, Any]:
return _request("GET", f"{service_base_url(base_url)}/health")
def hub_list(base_url: str | None = None) -> tuple[int, Any]:
return _request("GET", f"{service_base_url(base_url)}/v1/repos")
def hub_show(repo: str, base_url: str | None = None) -> tuple[int, Any]:
return _request("GET", f"{service_base_url(base_url)}/v1/repos/{repo}")
def hub_register(payload: dict[str, Any], base_url: str | None = None) -> tuple[int, Any]:
token = service_token()
if not token:
raise ValueError("REUSE_SURFACE_TOKEN is required for register")
return _request(
"POST",
f"{service_base_url(base_url)}/v1/repos",
token=token,
body=payload,
)
def hub_update(
repo: str, payload: dict[str, Any], base_url: str | None = None
) -> tuple[int, Any]:
token = service_token()
if not token:
raise ValueError("REUSE_SURFACE_TOKEN is required for update")
return _request(
"PATCH",
f"{service_base_url(base_url)}/v1/repos/{repo}",
token=token,
body=payload,
)