from __future__ import annotations import argparse import json import sys from dataclasses import dataclass from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any from urllib.parse import parse_qs, urlparse from .graph_explorer import fabric_graph_explorer_manifest, fabric_graph_explorer_payload from .graph_explorer_ui import graph_explorer_page from .registry import ( RegistryError, RegistryStore, backstage_projection, blast_radius, consumers, dependency_path_lines, library_xregistry_projection, providers, unresolved_dependencies, xregistry_projection, ) @dataclass(frozen=True) class HtmlResponse: body: str class RegistryHandler(BaseHTTPRequestHandler): store: RegistryStore def do_GET(self) -> None: self._handle(self._get) def do_POST(self) -> None: self._handle(self._post) def log_message(self, format: str, *args: object) -> None: return def _get(self, path: str, query: dict[str, list[str]]) -> tuple[int, Any]: parts = _parts(path) if path == "/health": return HTTPStatus.OK, {"status": "ok"} if parts == ["ui", "graph-explorer"]: return HTTPStatus.OK, HtmlResponse(graph_explorer_page()) if path == "/status": return HTTPStatus.OK, self.store.status() if parts == ["repositories"]: return HTTPStatus.OK, self.store.list_repositories() if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "inventory": return HTTPStatus.OK, self.store.repository_inventory(parts[1]) if len(parts) == 2 and parts[0] == "repositories": return HTTPStatus.OK, self.store.get_repository(parts[1]) if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "snapshots": return HTTPStatus.OK, self.store.list_snapshots(parts[1]) if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "snapshots" and parts[3] == "latest": return HTTPStatus.OK, self.store.latest_snapshot(parts[1]) if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "snapshots" and parts[3] == "diff": return HTTPStatus.OK, self.store.snapshot_diff( parts[1], from_id=_query_optional_int(query, "from_id"), to_id=_query_optional_int(query, "to_id"), ) if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "discovery-snapshots": return HTTPStatus.OK, self.store.list_discovery_snapshots( parts[1], profile=_query_optional(query, "profile"), ) if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "discovery-snapshots" and parts[3] == "latest": return HTTPStatus.OK, self.store.latest_discovery_snapshot( parts[1], profile=_query_optional(query, "profile"), ) if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "discovery-snapshots" and parts[3] == "diff": return HTTPStatus.OK, self.store.discovery_snapshot_diff( parts[1], from_id=_query_optional_int(query, "from_id"), to_id=_query_optional_int(query, "to_id"), profile=_query_optional(query, "profile"), ) if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "discovery-snapshots": snapshot = self.store.get_discovery_snapshot(_int_id(parts[3], "discovery_snapshot_id")) if snapshot["repo_slug"] != parts[1]: raise RegistryError("discovery snapshot id must belong to the requested repository", 404) return HTTPStatus.OK, snapshot if parts == ["search"]: return HTTPStatus.OK, self.store.search(_query_one(query, "q")) if parts == ["graph", "nodes"]: return HTTPStatus.OK, self.store.combined_graph()["nodes"] if len(parts) == 3 and parts[0] == "graph" and parts[1] == "nodes": return HTTPStatus.OK, self.store.graph_node_detail(parts[2]) if parts == ["graph", "providers"]: return HTTPStatus.OK, providers(self.store.combined_graph(), _query_one(query, "capability_type")) if parts == ["graph", "consumers"]: return HTTPStatus.OK, consumers(self.store.combined_graph(), _query_one(query, "target")) if parts == ["graph", "unresolved"]: return HTTPStatus.OK, unresolved_dependencies(self.store.combined_graph()) if parts == ["graph", "blast-radius"]: return HTTPStatus.OK, blast_radius(self.store.combined_graph(), _query_one(query, "interface_id")) if parts == ["graph", "dependency-path"]: return HTTPStatus.OK, {"lines": dependency_path_lines(self.store.combined_graph(), _query_one(query, "service_id"))} if parts == ["exports", "state-hub"]: return HTTPStatus.OK, self.store.combined_graph() if parts == ["exports", "reset-archive"]: return HTTPStatus.OK, self.store.reset_archive() if parts == ["exports", "backstage"]: return HTTPStatus.OK, backstage_projection(self.store.combined_graph()) if parts == ["exports", "xregistry"]: return HTTPStatus.OK, xregistry_projection(self.store.combined_graph()) if parts == ["exports", "graph-explorer"]: return HTTPStatus.OK, fabric_graph_explorer_payload( self.store.combined_graph(), self.store.list_repositories(), {str(snapshot["repo_slug"]) for snapshot in self.store.latest_snapshots()}, ) if parts == ["exports", "graph-explorer", "manifest"]: return HTTPStatus.OK, fabric_graph_explorer_manifest() if parts == ["artifacts"]: return HTTPStatus.OK, self.store.list_artifacts( repo_slug=_query_optional(query, "repo_slug"), target_id=_query_optional(query, "target_id"), artifact_type=_query_optional(query, "artifact_type") or _query_optional(query, "type"), ) if len(parts) == 2 and parts[0] == "artifacts": return HTTPStatus.OK, self.store.get_artifact(_int_id(parts[1], "artifact_id")) if parts == ["libraries"]: return HTTPStatus.OK, self.store.list_libraries( repo_slug=_query_optional(query, "repo_slug"), name=_query_optional(query, "name"), purl=_query_optional(query, "purl"), component_type=_query_optional(query, "component_type") or _query_optional(query, "type"), ) if len(parts) == 2 and parts[0] == "libraries": return HTTPStatus.OK, self.store.get_library(_int_id(parts[1], "library_id")) if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "libraries": return HTTPStatus.OK, self.store.list_libraries(repo_slug=parts[1]) if parts == ["exports", "libraries", "xregistry"]: return HTTPStatus.OK, library_xregistry_projection(self.store.list_libraries()) raise RegistryError(f"route not found: {path}", 404) def _post(self, path: str, _query: dict[str, list[str]]) -> tuple[int, Any]: parts = _parts(path) body = self._read_json() if parts == ["repositories"]: return HTTPStatus.CREATED, self.store.upsert_repository(body) if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "snapshots": return HTTPStatus.CREATED, self.store.add_snapshot(parts[1], body) if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "discovery-snapshots": return HTTPStatus.CREATED, self.store.add_discovery_snapshot(parts[1], body) if len(parts) == 5 and parts[0] == "repositories" and parts[2] == "discovery-snapshots" and parts[4] == "accept": return HTTPStatus.CREATED, self.store.accept_discovery_snapshot( parts[1], _int_id(parts[3], "discovery_snapshot_id"), body, ) if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "libraries" and parts[3] == "cyclonedx": return HTTPStatus.CREATED, self.store.ingest_cyclonedx(parts[1], body) if parts == ["admin", "reset-graph-data"]: return HTTPStatus.CREATED, self.store.reset_graph_data(body) if parts == ["artifacts"]: return HTTPStatus.CREATED, self.store.add_artifact(body) raise RegistryError(f"route not found: {path}", 404) def _handle(self, action: Any) -> None: parsed = urlparse(self.path) query = parse_qs(parsed.query) try: status, body = action(parsed.path, query) if isinstance(body, HtmlResponse): self._send_text(int(status), body.body, "text/html; charset=utf-8") else: self._send_json(int(status), body) except RegistryError as exc: self._send_json(exc.status_code, {"error": exc.message}) except json.JSONDecodeError as exc: self._send_json(400, {"error": f"invalid JSON request body: {exc}"}) except Exception as exc: self._send_json(500, {"error": str(exc)}) def _read_json(self) -> dict[str, Any]: length = int(self.headers.get("Content-Length", "0")) if length == 0: return {} raw = self.rfile.read(length) body = json.loads(raw.decode("utf-8")) if not isinstance(body, dict): raise RegistryError("request body must be a JSON object") return body def _send_json(self, status: int, body: Any) -> None: payload = json.dumps(body, indent=2, sort_keys=True).encode("utf-8") self._send_bytes(status, payload, "application/json; charset=utf-8") def _send_text(self, status: int, body: str, content_type: str) -> None: self._send_bytes(status, body.encode("utf-8"), content_type) def _send_bytes(self, status: int, payload: bytes, content_type: str) -> None: self.send_response(status) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(payload))) self.end_headers() self.wfile.write(payload) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="railiance-fabric-registry", description="Run the Railiance Fabric ecosystem registry service.", ) parser.add_argument("--db", type=Path, default=Path(".railiance-fabric/registry.sqlite3")) parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=8765) return parser def main(argv: list[str] | None = None) -> int: args = build_parser().parse_args(argv) store = RegistryStore(args.db) store.init_schema() class Handler(RegistryHandler): pass Handler.store = store server = ThreadingHTTPServer((args.host, args.port), Handler) print(f"Railiance Fabric Registry listening on http://{args.host}:{args.port}") try: server.serve_forever() except KeyboardInterrupt: print() return 0 finally: server.server_close() return 0 def _parts(path: str) -> list[str]: return [part for part in path.strip("/").split("/") if part] def _query_one(query: dict[str, list[str]], key: str) -> str: values = query.get(key) if not values or not values[0]: raise RegistryError(f"missing query parameter: {key}") return values[0] def _query_optional(query: dict[str, list[str]], key: str) -> str | None: values = query.get(key) if not values or not values[0]: return None return values[0] def _int_id(value: str, label: str) -> int: try: return int(value) except ValueError as exc: raise RegistryError(f"invalid {label}: {value}", 400) from exc def _query_optional_int(query: dict[str, list[str]], key: str) -> int | None: value = _query_optional(query, key) if value is None: return None return _int_id(value, key) if __name__ == "__main__": sys.exit(main())