generated from coulomb/repo-seed
277 lines
12 KiB
Python
277 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from http import HTTPStatus
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
from .graph_explorer import fabric_graph_explorer_manifest, fabric_graph_explorer_payload
|
|
from .graph_explorer_ui import graph_explorer_page
|
|
from .registry import (
|
|
RegistryError,
|
|
RegistryStore,
|
|
backstage_projection,
|
|
blast_radius,
|
|
consumers,
|
|
dependency_path_lines,
|
|
library_xregistry_projection,
|
|
providers,
|
|
unresolved_dependencies,
|
|
xregistry_projection,
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class HtmlResponse:
|
|
body: str
|
|
|
|
|
|
class RegistryHandler(BaseHTTPRequestHandler):
|
|
store: RegistryStore
|
|
|
|
def do_GET(self) -> None:
|
|
self._handle(self._get)
|
|
|
|
def do_POST(self) -> None:
|
|
self._handle(self._post)
|
|
|
|
def log_message(self, format: str, *args: object) -> None:
|
|
return
|
|
|
|
def _get(self, path: str, query: dict[str, list[str]]) -> tuple[int, Any]:
|
|
parts = _parts(path)
|
|
if path == "/health":
|
|
return HTTPStatus.OK, {"status": "ok"}
|
|
if parts == ["ui", "graph-explorer"]:
|
|
return HTTPStatus.OK, HtmlResponse(graph_explorer_page())
|
|
if path == "/status":
|
|
return HTTPStatus.OK, self.store.status()
|
|
if parts == ["repositories"]:
|
|
return HTTPStatus.OK, self.store.list_repositories()
|
|
if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "inventory":
|
|
return HTTPStatus.OK, self.store.repository_inventory(parts[1])
|
|
if len(parts) == 2 and parts[0] == "repositories":
|
|
return HTTPStatus.OK, self.store.get_repository(parts[1])
|
|
if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "snapshots":
|
|
return HTTPStatus.OK, self.store.list_snapshots(parts[1])
|
|
if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "snapshots" and parts[3] == "latest":
|
|
return HTTPStatus.OK, self.store.latest_snapshot(parts[1])
|
|
if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "snapshots" and parts[3] == "diff":
|
|
return HTTPStatus.OK, self.store.snapshot_diff(
|
|
parts[1],
|
|
from_id=_query_optional_int(query, "from_id"),
|
|
to_id=_query_optional_int(query, "to_id"),
|
|
)
|
|
if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "discovery-snapshots":
|
|
return HTTPStatus.OK, self.store.list_discovery_snapshots(
|
|
parts[1],
|
|
profile=_query_optional(query, "profile"),
|
|
)
|
|
if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "discovery-snapshots" and parts[3] == "latest":
|
|
return HTTPStatus.OK, self.store.latest_discovery_snapshot(
|
|
parts[1],
|
|
profile=_query_optional(query, "profile"),
|
|
)
|
|
if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "discovery-snapshots" and parts[3] == "diff":
|
|
return HTTPStatus.OK, self.store.discovery_snapshot_diff(
|
|
parts[1],
|
|
from_id=_query_optional_int(query, "from_id"),
|
|
to_id=_query_optional_int(query, "to_id"),
|
|
profile=_query_optional(query, "profile"),
|
|
)
|
|
if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "discovery-snapshots":
|
|
snapshot = self.store.get_discovery_snapshot(_int_id(parts[3], "discovery_snapshot_id"))
|
|
if snapshot["repo_slug"] != parts[1]:
|
|
raise RegistryError("discovery snapshot id must belong to the requested repository", 404)
|
|
return HTTPStatus.OK, snapshot
|
|
if parts == ["search"]:
|
|
return HTTPStatus.OK, self.store.search(_query_one(query, "q"))
|
|
if parts == ["graph", "nodes"]:
|
|
return HTTPStatus.OK, self.store.combined_graph()["nodes"]
|
|
if len(parts) == 3 and parts[0] == "graph" and parts[1] == "nodes":
|
|
return HTTPStatus.OK, self.store.graph_node_detail(parts[2])
|
|
if parts == ["graph", "providers"]:
|
|
return HTTPStatus.OK, providers(self.store.combined_graph(), _query_one(query, "capability_type"))
|
|
if parts == ["graph", "consumers"]:
|
|
return HTTPStatus.OK, consumers(self.store.combined_graph(), _query_one(query, "target"))
|
|
if parts == ["graph", "unresolved"]:
|
|
return HTTPStatus.OK, unresolved_dependencies(self.store.combined_graph())
|
|
if parts == ["graph", "blast-radius"]:
|
|
return HTTPStatus.OK, blast_radius(self.store.combined_graph(), _query_one(query, "interface_id"))
|
|
if parts == ["graph", "dependency-path"]:
|
|
return HTTPStatus.OK, {"lines": dependency_path_lines(self.store.combined_graph(), _query_one(query, "service_id"))}
|
|
if parts == ["exports", "state-hub"]:
|
|
return HTTPStatus.OK, self.store.combined_graph()
|
|
if parts == ["exports", "reset-archive"]:
|
|
return HTTPStatus.OK, self.store.reset_archive()
|
|
if parts == ["exports", "backstage"]:
|
|
return HTTPStatus.OK, backstage_projection(self.store.combined_graph())
|
|
if parts == ["exports", "xregistry"]:
|
|
return HTTPStatus.OK, xregistry_projection(self.store.combined_graph())
|
|
if parts == ["exports", "graph-explorer"]:
|
|
return HTTPStatus.OK, fabric_graph_explorer_payload(
|
|
self.store.combined_graph(),
|
|
self.store.list_repositories(),
|
|
{str(snapshot["repo_slug"]) for snapshot in self.store.latest_snapshots()},
|
|
)
|
|
if parts == ["exports", "graph-explorer", "manifest"]:
|
|
return HTTPStatus.OK, fabric_graph_explorer_manifest()
|
|
if parts == ["artifacts"]:
|
|
return HTTPStatus.OK, self.store.list_artifacts(
|
|
repo_slug=_query_optional(query, "repo_slug"),
|
|
target_id=_query_optional(query, "target_id"),
|
|
artifact_type=_query_optional(query, "artifact_type") or _query_optional(query, "type"),
|
|
)
|
|
if len(parts) == 2 and parts[0] == "artifacts":
|
|
return HTTPStatus.OK, self.store.get_artifact(_int_id(parts[1], "artifact_id"))
|
|
if parts == ["libraries"]:
|
|
return HTTPStatus.OK, self.store.list_libraries(
|
|
repo_slug=_query_optional(query, "repo_slug"),
|
|
name=_query_optional(query, "name"),
|
|
purl=_query_optional(query, "purl"),
|
|
component_type=_query_optional(query, "component_type") or _query_optional(query, "type"),
|
|
)
|
|
if len(parts) == 2 and parts[0] == "libraries":
|
|
return HTTPStatus.OK, self.store.get_library(_int_id(parts[1], "library_id"))
|
|
if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "libraries":
|
|
return HTTPStatus.OK, self.store.list_libraries(repo_slug=parts[1])
|
|
if parts == ["exports", "libraries", "xregistry"]:
|
|
return HTTPStatus.OK, library_xregistry_projection(self.store.list_libraries())
|
|
raise RegistryError(f"route not found: {path}", 404)
|
|
|
|
def _post(self, path: str, _query: dict[str, list[str]]) -> tuple[int, Any]:
|
|
parts = _parts(path)
|
|
body = self._read_json()
|
|
if parts == ["repositories"]:
|
|
return HTTPStatus.CREATED, self.store.upsert_repository(body)
|
|
if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "snapshots":
|
|
return HTTPStatus.CREATED, self.store.add_snapshot(parts[1], body)
|
|
if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "discovery-snapshots":
|
|
return HTTPStatus.CREATED, self.store.add_discovery_snapshot(parts[1], body)
|
|
if len(parts) == 5 and parts[0] == "repositories" and parts[2] == "discovery-snapshots" and parts[4] == "accept":
|
|
return HTTPStatus.CREATED, self.store.accept_discovery_snapshot(
|
|
parts[1],
|
|
_int_id(parts[3], "discovery_snapshot_id"),
|
|
body,
|
|
)
|
|
if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "libraries" and parts[3] == "cyclonedx":
|
|
return HTTPStatus.CREATED, self.store.ingest_cyclonedx(parts[1], body)
|
|
if parts == ["admin", "reset-graph-data"]:
|
|
return HTTPStatus.CREATED, self.store.reset_graph_data(body)
|
|
if parts == ["artifacts"]:
|
|
return HTTPStatus.CREATED, self.store.add_artifact(body)
|
|
raise RegistryError(f"route not found: {path}", 404)
|
|
|
|
def _handle(self, action: Any) -> None:
|
|
parsed = urlparse(self.path)
|
|
query = parse_qs(parsed.query)
|
|
try:
|
|
status, body = action(parsed.path, query)
|
|
if isinstance(body, HtmlResponse):
|
|
self._send_text(int(status), body.body, "text/html; charset=utf-8")
|
|
else:
|
|
self._send_json(int(status), body)
|
|
except RegistryError as exc:
|
|
self._send_json(exc.status_code, {"error": exc.message})
|
|
except json.JSONDecodeError as exc:
|
|
self._send_json(400, {"error": f"invalid JSON request body: {exc}"})
|
|
except Exception as exc:
|
|
self._send_json(500, {"error": str(exc)})
|
|
|
|
def _read_json(self) -> dict[str, Any]:
|
|
length = int(self.headers.get("Content-Length", "0"))
|
|
if length == 0:
|
|
return {}
|
|
raw = self.rfile.read(length)
|
|
body = json.loads(raw.decode("utf-8"))
|
|
if not isinstance(body, dict):
|
|
raise RegistryError("request body must be a JSON object")
|
|
return body
|
|
|
|
def _send_json(self, status: int, body: Any) -> None:
|
|
payload = json.dumps(body, indent=2, sort_keys=True).encode("utf-8")
|
|
self._send_bytes(status, payload, "application/json; charset=utf-8")
|
|
|
|
def _send_text(self, status: int, body: str, content_type: str) -> None:
|
|
self._send_bytes(status, body.encode("utf-8"), content_type)
|
|
|
|
def _send_bytes(self, status: int, payload: bytes, content_type: str) -> None:
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", content_type)
|
|
self.send_header("Content-Length", str(len(payload)))
|
|
self.end_headers()
|
|
self.wfile.write(payload)
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
prog="railiance-fabric-registry",
|
|
description="Run the Railiance Fabric ecosystem registry service.",
|
|
)
|
|
parser.add_argument("--db", type=Path, default=Path(".railiance-fabric/registry.sqlite3"))
|
|
parser.add_argument("--host", default="127.0.0.1")
|
|
parser.add_argument("--port", type=int, default=8765)
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = build_parser().parse_args(argv)
|
|
store = RegistryStore(args.db)
|
|
store.init_schema()
|
|
|
|
class Handler(RegistryHandler):
|
|
pass
|
|
|
|
Handler.store = store
|
|
server = ThreadingHTTPServer((args.host, args.port), Handler)
|
|
print(f"Railiance Fabric Registry listening on http://{args.host}:{args.port}")
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
print()
|
|
return 0
|
|
finally:
|
|
server.server_close()
|
|
return 0
|
|
|
|
|
|
def _parts(path: str) -> list[str]:
|
|
return [part for part in path.strip("/").split("/") if part]
|
|
|
|
|
|
def _query_one(query: dict[str, list[str]], key: str) -> str:
|
|
values = query.get(key)
|
|
if not values or not values[0]:
|
|
raise RegistryError(f"missing query parameter: {key}")
|
|
return values[0]
|
|
|
|
|
|
def _query_optional(query: dict[str, list[str]], key: str) -> str | None:
|
|
values = query.get(key)
|
|
if not values or not values[0]:
|
|
return None
|
|
return values[0]
|
|
|
|
|
|
def _int_id(value: str, label: str) -> int:
|
|
try:
|
|
return int(value)
|
|
except ValueError as exc:
|
|
raise RegistryError(f"invalid {label}: {value}", 400) from exc
|
|
|
|
|
|
def _query_optional_int(query: dict[str, list[str]], key: str) -> int | None:
|
|
value = _query_optional(query, key)
|
|
if value is None:
|
|
return None
|
|
return _int_id(value, key)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|