Add registry feed and library inventory

This commit is contained in:
2026-05-17 20:41:31 +02:00
parent faff5fb728
commit 3bf22e18ba
7 changed files with 549 additions and 0 deletions

View File

@@ -1,9 +1,16 @@
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from .loader import load_yaml
from .graph import FabricGraph, build_graph
from .validation import validate_roots
@@ -53,6 +60,26 @@ def build_parser() -> argparse.ArgumentParser:
export = sub.add_parser("export", help="Export graph as JSON or Mermaid.")
export.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
export.add_argument("--format", choices=["json", "mermaid"], default="json")
registry = sub.add_parser("registry", help="Feed a running Railiance Fabric registry service.")
registry_sub = registry.add_subparsers(dest="registry_command", required=True)
sync = registry_sub.add_parser("sync", help="Register a repo and ingest its current graph snapshot.")
sync.add_argument("paths", nargs="*", type=Path, default=[Path(".")])
sync.add_argument("--registry-url", default="http://127.0.0.1:8765")
sync.add_argument("--repo-slug", default=None)
sync.add_argument("--name", default=None)
sync.add_argument("--remote-url", default=None)
sync.add_argument("--default-branch", default="main")
sync.add_argument("--state-hub-repo-id", default=None)
sync.add_argument("--commit", default=None)
sync.add_argument("--json", action="store_true", help="Print the raw snapshot response.")
cyclonedx = registry_sub.add_parser("ingest-cyclonedx", help="Ingest a CycloneDX SBOM as library inventory.")
cyclonedx.add_argument("sbom", type=Path)
cyclonedx.add_argument("--registry-url", default="http://127.0.0.1:8765")
cyclonedx.add_argument("--repo-slug", required=True)
cyclonedx.add_argument("--json", action="store_true", help="Print the raw ingest response.")
return parser
@@ -101,9 +128,128 @@ def main(argv: list[str] | None = None) -> int:
print(graph.to_mermaid() if args.format == "mermaid" else graph.to_json())
return 0
if args.command == "registry":
if args.registry_command == "sync":
return _registry_sync(args)
if args.registry_command == "ingest-cyclonedx":
return _registry_ingest_cyclonedx(args)
parser.error(f"unknown command {args.command!r}")
return 2
def _registry_sync(args: argparse.Namespace) -> int:
report = validate_roots(args.paths)
for diagnostic in report.diagnostics:
print(diagnostic.format(), file=sys.stderr)
if report.errors:
print(report.summary(), file=sys.stderr)
return 1
graph = _load_graph_or_exit(args.paths)
repo_path = _primary_repo_path(args.paths)
repo_slug = args.repo_slug or _slugify(repo_path.name)
repository = _registry_post(
args.registry_url,
"/repositories",
{
"slug": repo_slug,
"name": args.name or repo_path.name,
"remote_url": args.remote_url or _git_value(repo_path, "config", "--get", "remote.origin.url"),
"default_branch": args.default_branch,
"state_hub_repo_id": args.state_hub_repo_id,
},
)
snapshot = _registry_post(
args.registry_url,
f"/repositories/{repo_slug}/snapshots",
{
"commit": args.commit or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree",
"generated_at": _utc_now(),
"graph": graph.to_export(),
},
)
if args.json:
print(json.dumps({"repository": repository, "snapshot": snapshot}, indent=2, sort_keys=True))
else:
print(f"registered {repository['slug']}")
print(f"snapshot {snapshot['id']} accepted for {snapshot['commit']}")
return 0
def _registry_ingest_cyclonedx(args: argparse.Namespace) -> int:
payload = load_yaml(args.sbom)
if not isinstance(payload, dict):
print(f"ERROR {args.sbom}: CycloneDX SBOM must be a mapping/object", file=sys.stderr)
return 1
result = _registry_post(
args.registry_url,
f"/repositories/{args.repo_slug}/libraries/cyclonedx",
payload,
)
if args.json:
print(json.dumps(result, indent=2, sort_keys=True))
else:
print(f"ingested {result['component_count']} library component(s) for {result['repo_slug']}")
return 0
def _registry_post(registry_url: str, path: str, payload: dict[str, object]) -> dict[str, object]:
data = json.dumps({key: value for key, value in payload.items() if value is not None}).encode("utf-8")
request = urllib.request.Request(
registry_url.rstrip("/") + path,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=15) as response:
body = json.loads(response.read())
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
print(f"ERROR registry request failed ({exc.code}): {detail}", file=sys.stderr)
raise SystemExit(1) from exc
except urllib.error.URLError as exc:
print(f"ERROR cannot reach registry at {registry_url}: {exc}", file=sys.stderr)
raise SystemExit(1) from exc
if not isinstance(body, dict):
print("ERROR registry returned a non-object response", file=sys.stderr)
raise SystemExit(1)
return body
def _primary_repo_path(paths: list[Path]) -> Path:
if not paths:
return Path(".").resolve()
path = paths[0].resolve()
return path.parent if path.is_file() else path
def _slugify(value: str) -> str:
return re.sub(r"-+", "-", re.sub(r"[^a-z0-9]", "-", value.lower())).strip("-") or "repo"
def _git_value(repo_path: Path, *args: str) -> str | None:
try:
result = subprocess.run(
["git", *args],
cwd=repo_path,
check=False,
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
return None
if result.returncode != 0:
return None
value = result.stdout.strip()
return value or None
def _utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _load_graph_or_exit(paths: list[Path]) -> FabricGraph:
graph = build_graph(paths)
if graph.load_errors:

View File

@@ -71,6 +71,27 @@ class RegistryStore:
create index if not exists idx_artifacts_target
on artifacts(target_id);
create table if not exists libraries (
id integer primary key autoincrement,
repo_slug text not null references repositories(slug),
bom_ref text,
component_type text not null,
name text not null,
version text,
purl text,
scope text,
licenses_json text not null,
hashes_json text not null,
metadata_json text not null,
created_at text not null
);
create index if not exists idx_libraries_repo
on libraries(repo_slug);
create index if not exists idx_libraries_purl
on libraries(purl);
"""
)
@@ -316,6 +337,97 @@ class RegistryStore:
enriched["artifacts"] = self.list_artifacts(target_id=graph_id)
return enriched
def ingest_cyclonedx(self, repo_slug: str, payload: dict[str, Any]) -> dict[str, Any]:
self.get_repository(repo_slug)
bom = payload.get("bom") if "bom" in payload else payload
if not isinstance(bom, dict):
raise RegistryError("CycloneDX payload must be an object")
if bom.get("bomFormat") and bom.get("bomFormat") != "CycloneDX":
raise RegistryError("CycloneDX payload must have bomFormat 'CycloneDX'")
entries = _cyclonedx_entries(bom)
now = _utc_now()
with self._connect() as db:
db.execute("delete from libraries where repo_slug = ?", (repo_slug,))
for entry in entries:
db.execute(
"""
insert into libraries (
repo_slug, bom_ref, component_type, name, version, purl,
scope, licenses_json, hashes_json, metadata_json, created_at
)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
repo_slug,
entry["bom_ref"],
entry["component_type"],
entry["name"],
entry["version"],
entry["purl"],
entry["scope"],
json.dumps(entry["licenses"], sort_keys=True),
json.dumps(entry["hashes"], sort_keys=True),
json.dumps(entry["metadata"], sort_keys=True),
now,
),
)
return {
"repo_slug": repo_slug,
"component_count": len(entries),
"libraries": self.list_libraries(repo_slug=repo_slug),
}
def list_libraries(
self,
repo_slug: str | None = None,
name: str | None = None,
purl: str | None = None,
component_type: str | None = None,
) -> list[dict[str, Any]]:
conditions: list[str] = []
params: list[str] = []
if repo_slug:
conditions.append("repo_slug = ?")
params.append(repo_slug)
if name:
conditions.append("name = ?")
params.append(name)
if purl:
conditions.append("purl = ?")
params.append(purl)
if component_type:
conditions.append("component_type = ?")
params.append(component_type)
where = f" where {' and '.join(conditions)}" if conditions else ""
with self._connect() as db:
rows = db.execute(
"""
select id, repo_slug, bom_ref, component_type, name, version, purl,
scope, licenses_json, hashes_json, metadata_json, created_at
from libraries
"""
+ where
+ " order by repo_slug, name, version, id",
params,
).fetchall()
return [_library_dict(row) for row in rows]
def get_library(self, library_id: int) -> dict[str, Any]:
with self._connect() as db:
row = db.execute(
"""
select id, repo_slug, bom_ref, component_type, name, version, purl,
scope, licenses_json, hashes_json, metadata_json, created_at
from libraries
where id = ?
""",
(library_id,),
).fetchone()
if row is None:
raise RegistryError(f"library not found: {library_id}", 404)
return _library_dict(row)
def _connect(self) -> sqlite3.Connection:
db = sqlite3.connect(self.path)
db.row_factory = sqlite3.Row
@@ -614,6 +726,30 @@ def xregistry_projection(graph: dict[str, Any]) -> dict[str, Any]:
}
def library_xregistry_projection(libraries: list[dict[str, Any]]) -> dict[str, Any]:
group = _xregistry_group("library", "libraries")
for library in libraries:
key = _xregistry_key(str(library.get("purl") or library.get("bom_ref") or library.get("name", "")))
group["resources"][key] = {
"id": library.get("purl") or library.get("bom_ref") or library.get("name", ""),
"name": library.get("name", ""),
"versionid": library.get("version") or "unknown",
"attributes": {
"repo": library.get("repo_slug", ""),
"bom_ref": library.get("bom_ref", ""),
"component_type": library.get("component_type", ""),
"scope": library.get("scope", ""),
"licenses": library.get("licenses", []),
"hashes": library.get("hashes", []),
},
}
return {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "LibraryXRegistryProjection",
"groups": {"libraries": group},
}
def _with_source(graph: dict[str, Any], repo_slug: str, commit: str, generated_at: str) -> dict[str, Any]:
copy = json.loads(json.dumps(graph))
copy.setdefault("generated_at", generated_at)
@@ -656,6 +792,92 @@ def _artifact_dict(row: sqlite3.Row) -> dict[str, Any]:
}
def _library_dict(row: sqlite3.Row) -> dict[str, Any]:
return {
"id": row["id"],
"repo_slug": row["repo_slug"],
"bom_ref": row["bom_ref"],
"component_type": row["component_type"],
"name": row["name"],
"version": row["version"],
"purl": row["purl"],
"scope": row["scope"],
"licenses": json.loads(row["licenses_json"]),
"hashes": json.loads(row["hashes_json"]),
"metadata": json.loads(row["metadata_json"]),
"created_at": row["created_at"],
}
def _cyclonedx_entries(bom: dict[str, Any]) -> list[dict[str, Any]]:
entries: list[dict[str, Any]] = []
for component in bom.get("components", []):
if not isinstance(component, dict):
continue
name = str(component.get("name") or "").strip()
if not name:
continue
entries.append(
{
"bom_ref": _optional_component_text(component, "bom-ref"),
"component_type": str(component.get("type") or "library"),
"name": name,
"version": _optional_component_text(component, "version"),
"purl": _optional_component_text(component, "purl"),
"scope": _optional_component_text(component, "scope"),
"licenses": _normalize_licenses(component.get("licenses", [])),
"hashes": component.get("hashes", []) if isinstance(component.get("hashes", []), list) else [],
"metadata": {
"group": component.get("group"),
"publisher": component.get("publisher"),
"description": component.get("description"),
"externalReferences": component.get("externalReferences", []),
},
}
)
for service in bom.get("services", []):
if not isinstance(service, dict):
continue
name = str(service.get("name") or "").strip()
if not name:
continue
entries.append(
{
"bom_ref": _optional_component_text(service, "bom-ref"),
"component_type": "service",
"name": name,
"version": _optional_component_text(service, "version"),
"purl": None,
"scope": None,
"licenses": [],
"hashes": [],
"metadata": {
"provider": service.get("provider"),
"endpoints": service.get("endpoints", []),
"externalReferences": service.get("externalReferences", []),
},
}
)
return entries
def _optional_component_text(component: dict[str, Any], key: str) -> str | None:
value = component.get(key)
if value is None:
return None
return str(value)
def _normalize_licenses(raw: Any) -> list[dict[str, Any]]:
if not isinstance(raw, list):
return []
normalized = []
for item in raw:
if isinstance(item, dict):
normalized.append(item)
return normalized
def _required_text(payload: dict[str, Any], key: str, fallback_key: str | None = None) -> str:
value = payload.get(key)
if value is None and fallback_key is not None:

View File

@@ -16,6 +16,7 @@ from .registry import (
blast_radius,
consumers,
dependency_path_lines,
library_xregistry_projection,
providers,
unresolved_dependencies,
xregistry_projection,
@@ -72,6 +73,19 @@ class RegistryHandler(BaseHTTPRequestHandler):
)
if len(parts) == 2 and parts[0] == "artifacts":
return HTTPStatus.OK, self.store.get_artifact(_int_id(parts[1], "artifact_id"))
if parts == ["libraries"]:
return HTTPStatus.OK, self.store.list_libraries(
repo_slug=_query_optional(query, "repo_slug"),
name=_query_optional(query, "name"),
purl=_query_optional(query, "purl"),
component_type=_query_optional(query, "component_type") or _query_optional(query, "type"),
)
if len(parts) == 2 and parts[0] == "libraries":
return HTTPStatus.OK, self.store.get_library(_int_id(parts[1], "library_id"))
if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "libraries":
return HTTPStatus.OK, self.store.list_libraries(repo_slug=parts[1])
if parts == ["exports", "libraries", "xregistry"]:
return HTTPStatus.OK, library_xregistry_projection(self.store.list_libraries())
raise RegistryError(f"route not found: {path}", 404)
def _post(self, path: str, _query: dict[str, list[str]]) -> tuple[int, Any]:
@@ -81,6 +95,8 @@ class RegistryHandler(BaseHTTPRequestHandler):
return HTTPStatus.CREATED, self.store.upsert_repository(body)
if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "snapshots":
return HTTPStatus.CREATED, self.store.add_snapshot(parts[1], body)
if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "libraries" and parts[3] == "cyclonedx":
return HTTPStatus.CREATED, self.store.ingest_cyclonedx(parts[1], body)
if parts == ["artifacts"]:
return HTTPStatus.CREATED, self.store.add_artifact(body)
raise RegistryError(f"route not found: {path}", 404)