generated from coulomb/repo-seed
Some checks failed
ci / validate-registry (push) Has been cancelled
Use reuse.coulomb.social, REUSE_SURFACE_URL/TOKEN env vars, reuse-surface image and reuse-surface-env secret. Replace reuse-surface-hub entrypoint with reuse-surface serve; CLI uses --base-url.
86 lines
2.6 KiB
Python
86 lines
2.6 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import urllib.error
|
|
import urllib.request
|
|
from typing import Any
|
|
|
|
|
|
def service_base_url(explicit: str | None = None) -> str:
|
|
base = (explicit or os.environ.get("REUSE_SURFACE_URL", "")).rstrip("/")
|
|
if not base:
|
|
raise ValueError(
|
|
"service URL not configured; set REUSE_SURFACE_URL or pass --base-url"
|
|
)
|
|
return base
|
|
|
|
|
|
def service_token() -> str | None:
|
|
return os.environ.get("REUSE_SURFACE_TOKEN")
|
|
|
|
|
|
def _request(
|
|
method: str,
|
|
url: str,
|
|
*,
|
|
token: str | None = None,
|
|
body: dict[str, Any] | None = None,
|
|
) -> tuple[int, Any]:
|
|
headers = {"Accept": "application/json", "User-Agent": "reuse-surface/0.1"}
|
|
data = None
|
|
if body is not None:
|
|
headers["Content-Type"] = "application/json"
|
|
data = json.dumps(body).encode("utf-8")
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
request = urllib.request.Request(url, data=data, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=30) as response:
|
|
raw = response.read().decode("utf-8")
|
|
return response.status, json.loads(raw) if raw else None
|
|
except urllib.error.HTTPError as exc:
|
|
raw = exc.read().decode("utf-8")
|
|
try:
|
|
payload = json.loads(raw) if raw else {"message": exc.reason}
|
|
except json.JSONDecodeError:
|
|
payload = {"message": raw or exc.reason}
|
|
return exc.code, payload
|
|
|
|
|
|
def hub_status(base_url: str | None = None) -> tuple[int, Any]:
|
|
return _request("GET", f"{service_base_url(base_url)}/health")
|
|
|
|
|
|
def hub_list(base_url: str | None = None) -> tuple[int, Any]:
|
|
return _request("GET", f"{service_base_url(base_url)}/v1/repos")
|
|
|
|
|
|
def hub_show(repo: str, base_url: str | None = None) -> tuple[int, Any]:
|
|
return _request("GET", f"{service_base_url(base_url)}/v1/repos/{repo}")
|
|
|
|
|
|
def hub_register(payload: dict[str, Any], base_url: str | None = None) -> tuple[int, Any]:
|
|
token = service_token()
|
|
if not token:
|
|
raise ValueError("REUSE_SURFACE_TOKEN is required for register")
|
|
return _request(
|
|
"POST",
|
|
f"{service_base_url(base_url)}/v1/repos",
|
|
token=token,
|
|
body=payload,
|
|
)
|
|
|
|
|
|
def hub_update(
|
|
repo: str, payload: dict[str, Any], base_url: str | None = None
|
|
) -> tuple[int, Any]:
|
|
token = service_token()
|
|
if not token:
|
|
raise ValueError("REUSE_SURFACE_TOKEN is required for update")
|
|
return _request(
|
|
"PATCH",
|
|
f"{service_base_url(base_url)}/v1/repos/{repo}",
|
|
token=token,
|
|
body=payload,
|
|
) |