generated from coulomb/repo-seed
Some checks failed
ci / validate-registry (push) Has been cancelled
Use reuse.coulomb.social, REUSE_SURFACE_URL/TOKEN env vars, reuse-surface image and reuse-surface-env secret. Replace reuse-surface-hub entrypoint with reuse-surface serve; CLI uses --base-url.
143 lines
4.9 KiB
Python
143 lines
4.9 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request
|
|
from fastapi.responses import JSONResponse, Response
|
|
|
|
from reuse_surface.hub.compose import compose_from_store, DEFAULT_DOMAIN
|
|
from reuse_surface.hub.store import HubStore
|
|
|
|
HUB_VERSION = "0.1.0"
|
|
|
|
|
|
def _db_path() -> Path:
|
|
return Path(os.environ.get("REUSE_SURFACE_DB", "/data/reuse.db"))
|
|
|
|
|
|
def _cache_dir() -> Path:
|
|
return Path(os.environ.get("REUSE_SURFACE_CACHE_DIR", "/data/cache"))
|
|
|
|
|
|
def _write_token() -> str:
|
|
return os.environ.get("REUSE_SURFACE_TOKEN", "")
|
|
|
|
|
|
def _store() -> HubStore:
|
|
return HubStore(_db_path())
|
|
|
|
|
|
def _http_error(status: int, error: str, message: str) -> HTTPException:
|
|
return HTTPException(
|
|
status_code=status,
|
|
detail={"error": error, "message": message, "details": []},
|
|
)
|
|
|
|
|
|
def _require_auth(authorization: str | None = Header(default=None)) -> None:
|
|
write_token = _write_token()
|
|
if not write_token:
|
|
raise _http_error(
|
|
503, "misconfigured", "REUSE_SURFACE_TOKEN is not configured"
|
|
)
|
|
if not authorization or not authorization.startswith("Bearer "):
|
|
raise _http_error(401, "unauthorized", "Bearer token required")
|
|
token = authorization.removeprefix("Bearer ").strip()
|
|
if token != write_token:
|
|
raise _http_error(401, "unauthorized", "Invalid token")
|
|
|
|
|
|
def create_app() -> FastAPI:
|
|
app = FastAPI(title="reuse-surface federation hub", version=HUB_VERSION)
|
|
store = _store()
|
|
|
|
@app.get("/health")
|
|
def health() -> dict[str, str]:
|
|
return {"status": "ok", "service": "reuse-surface", "version": HUB_VERSION}
|
|
|
|
@app.get("/v1/repos")
|
|
def list_repos() -> dict[str, Any]:
|
|
repos = store.list_repos()
|
|
return {"count": len(repos), "repos": repos}
|
|
|
|
@app.post("/v1/repos", status_code=201, dependencies=[Depends(_require_auth)])
|
|
async def register_repo(request: Request) -> dict[str, Any]:
|
|
payload = await request.json()
|
|
try:
|
|
return store.create_repo(payload)
|
|
except FileExistsError as exc:
|
|
raise _http_error(409, "conflict", str(exc)) from exc
|
|
except ValueError as exc:
|
|
raise _http_error(400, "validation_error", str(exc)) from exc
|
|
|
|
@app.get("/v1/repos/{repo}")
|
|
def get_repo(repo: str) -> dict[str, Any]:
|
|
registration = store.get_repo(repo)
|
|
if registration is None:
|
|
raise _http_error(404, "not_found", f"repo not found: {repo}")
|
|
return registration
|
|
|
|
@app.patch("/v1/repos/{repo}", dependencies=[Depends(_require_auth)])
|
|
async def update_repo(repo: str, request: Request) -> dict[str, Any]:
|
|
payload = await request.json()
|
|
try:
|
|
return store.update_repo(repo, payload)
|
|
except KeyError as exc:
|
|
raise _http_error(404, "not_found", str(exc)) from exc
|
|
except ValueError as exc:
|
|
raise _http_error(400, "validation_error", str(exc)) from exc
|
|
|
|
@app.delete("/v1/repos/{repo}", status_code=204, dependencies=[Depends(_require_auth)])
|
|
def delete_repo(repo: str) -> Response:
|
|
if not store.delete_repo(repo):
|
|
raise _http_error(404, "not_found", f"repo not found: {repo}")
|
|
return Response(status_code=204)
|
|
|
|
def _federated_response(
|
|
refresh: bool,
|
|
accept: str | None,
|
|
format_param: str,
|
|
) -> Response:
|
|
try:
|
|
federated, warnings = compose_from_store(
|
|
store, refresh=refresh, cache_dir=_cache_dir(), domain=DEFAULT_DOMAIN
|
|
)
|
|
except FileNotFoundError as exc:
|
|
raise _http_error(502, "compose_error", str(exc)) from exc
|
|
|
|
use_yaml = format_param == "yaml" or (accept and "yaml" in accept.lower())
|
|
headers: dict[str, str] = {}
|
|
if warnings:
|
|
headers["X-Federation-Warnings"] = "; ".join(warnings)
|
|
if use_yaml:
|
|
body = yaml.safe_dump(federated, sort_keys=False)
|
|
return Response(content=body, media_type="application/yaml", headers=headers)
|
|
return JSONResponse(content=federated, headers=headers)
|
|
|
|
@app.get("/v1/federated", response_model=None)
|
|
def get_federated(
|
|
request: Request,
|
|
refresh: bool = Query(default=False),
|
|
format: str = Query(default="json"),
|
|
) -> Response:
|
|
return _federated_response(refresh, request.headers.get("accept"), format)
|
|
|
|
@app.post("/v1/federated/compose", response_model=None, dependencies=[Depends(_require_auth)])
|
|
def compose_federated(
|
|
request: Request,
|
|
format: str = Query(default="json"),
|
|
) -> Response:
|
|
return _federated_response(True, request.headers.get("accept"), format)
|
|
|
|
return app
|
|
|
|
|
|
def main() -> None:
|
|
import uvicorn
|
|
|
|
host = os.environ.get("REUSE_SURFACE_HOST", "0.0.0.0")
|
|
port = int(os.environ.get("REUSE_SURFACE_PORT", "8000"))
|
|
uvicorn.run(create_app(), host=host, port=port, reload=False) |