Add registry inventory and drift views

This commit is contained in:
2026-05-17 20:49:28 +02:00
parent 3bf22e18ba
commit d1c003411f
6 changed files with 332 additions and 0 deletions

View File

@@ -184,6 +184,20 @@ class RegistryStore:
raise RegistryError(f"snapshot not found: {snapshot_id}", 404)
return _snapshot_dict(row)
def list_snapshots(self, repo_slug: str) -> list[dict[str, Any]]:
self.get_repository(repo_slug)
with self._connect() as db:
rows = db.execute(
"""
select id, repo_slug, commit_sha, generated_at, graph_json, created_at
from snapshots
where repo_slug = ?
order by id desc
""",
(repo_slug,),
).fetchall()
return [_snapshot_summary(row) for row in rows]
def latest_snapshot(self, repo_slug: str) -> dict[str, Any]:
self.get_repository(repo_slug)
with self._connect() as db:
@@ -337,6 +351,39 @@ class RegistryStore:
enriched["artifacts"] = self.list_artifacts(target_id=graph_id)
return enriched
def repository_inventory(self, repo_slug: str) -> dict[str, Any]:
repository = self.get_repository(repo_slug)
try:
latest_snapshot = self.latest_snapshot(repo_slug)
except RegistryError as exc:
if exc.status_code != 404:
raise
latest_snapshot = None
graph = latest_snapshot["graph"] if latest_snapshot else _empty_graph()
nodes = [node for node in graph.get("nodes", []) if isinstance(node, dict)]
edges = [edge for edge in graph.get("edges", []) if isinstance(edge, dict)]
artifacts = self.list_artifacts(repo_slug=repo_slug)
libraries = self.list_libraries(repo_slug=repo_slug)
return {
"repository": repository,
"latest_snapshot": _snapshot_public_summary(latest_snapshot) if latest_snapshot else None,
"counts": {
"snapshots": len(self.list_snapshots(repo_slug)),
"nodes": len(nodes),
"edges": len(edges),
"artifacts": len(artifacts),
"libraries": len(libraries),
},
"graph": {
"nodes": nodes,
"edges": edges,
"owner_repos": sorted({str(node.get("repo", "")) for node in nodes if node.get("repo")}),
},
"artifacts": artifacts,
"libraries": libraries,
}
def ingest_cyclonedx(self, repo_slug: str, payload: dict[str, Any]) -> dict[str, Any]:
self.get_repository(repo_slug)
bom = payload.get("bom") if "bom" in payload else payload
@@ -428,6 +475,61 @@ class RegistryStore:
raise RegistryError(f"library not found: {library_id}", 404)
return _library_dict(row)
def snapshot_diff(
self,
repo_slug: str,
from_id: int | None = None,
to_id: int | None = None,
) -> dict[str, Any]:
self.get_repository(repo_slug)
if from_id is None or to_id is None:
snapshots = self.list_snapshots(repo_slug)
if len(snapshots) < 2:
raise RegistryError(f"at least two snapshots are required for diff: {repo_slug}", 404)
to_id = snapshots[0]["id"] if to_id is None else to_id
from_id = snapshots[1]["id"] if from_id is None else from_id
before = self.get_snapshot(from_id)
after = self.get_snapshot(to_id)
if before["repo_slug"] != repo_slug or after["repo_slug"] != repo_slug:
raise RegistryError("snapshot ids must belong to the requested repository")
return {
"repo_slug": repo_slug,
"from": _snapshot_public_summary(before),
"to": _snapshot_public_summary(after),
"graph": _graph_diff(before["graph"], after["graph"]),
}
def search(self, query: str) -> dict[str, Any]:
needle = query.strip().lower()
if not needle:
raise RegistryError("query parameter 'q' is required")
graph = self.combined_graph()
return {
"query": query,
"nodes": [
node
for node in graph.get("nodes", [])
if isinstance(node, dict) and _matches(needle, node)
],
"artifacts": [
artifact
for artifact in self.list_artifacts()
if _matches(needle, artifact)
],
"libraries": [
library
for library in self.list_libraries()
if _matches(needle, library)
],
"repositories": [
repository
for repository in self.list_repositories()
if _matches(needle, repository)
],
}
def _connect(self) -> sqlite3.Connection:
db = sqlite3.connect(self.path)
db.row_factory = sqlite3.Row
@@ -771,6 +873,32 @@ def _snapshot_dict(row: sqlite3.Row) -> dict[str, Any]:
}
def _snapshot_summary(row: sqlite3.Row) -> dict[str, Any]:
graph = json.loads(row["graph_json"])
return {
"id": row["id"],
"repo_slug": row["repo_slug"],
"commit": row["commit_sha"],
"generated_at": row["generated_at"],
"created_at": row["created_at"],
"node_count": len(graph.get("nodes", [])),
"edge_count": len(graph.get("edges", [])),
}
def _snapshot_public_summary(snapshot: dict[str, Any]) -> dict[str, Any]:
graph = snapshot["graph"]
return {
"id": snapshot["id"],
"repo_slug": snapshot["repo_slug"],
"commit": snapshot["commit"],
"generated_at": snapshot["generated_at"],
"created_at": snapshot["created_at"],
"node_count": len(graph.get("nodes", [])),
"edge_count": len(graph.get("edges", [])),
}
def _row_dict(row: sqlite3.Row) -> dict[str, Any]:
return {key: row[key] for key in row.keys()}
@@ -878,6 +1006,61 @@ def _normalize_licenses(raw: Any) -> list[dict[str, Any]]:
return normalized
def _empty_graph() -> dict[str, Any]:
return {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "FabricGraphExport",
"nodes": [],
"edges": [],
}
def _graph_diff(before: dict[str, Any], after: dict[str, Any]) -> dict[str, Any]:
before_nodes = _nodes_by_id(before)
after_nodes = _nodes_by_id(after)
before_edges = {_edge_key(edge): edge for edge in before.get("edges", []) if isinstance(edge, dict)}
after_edges = {_edge_key(edge): edge for edge in after.get("edges", []) if isinstance(edge, dict)}
added_node_ids = sorted(set(after_nodes) - set(before_nodes))
removed_node_ids = sorted(set(before_nodes) - set(after_nodes))
common_node_ids = sorted(set(before_nodes) & set(after_nodes))
changed_node_ids = [
node_id
for node_id in common_node_ids
if _stable_json(before_nodes[node_id]) != _stable_json(after_nodes[node_id])
]
added_edge_keys = sorted(set(after_edges) - set(before_edges))
removed_edge_keys = sorted(set(before_edges) - set(after_edges))
return {
"added_nodes": [after_nodes[node_id] for node_id in added_node_ids],
"removed_nodes": [before_nodes[node_id] for node_id in removed_node_ids],
"changed_nodes": [
{
"id": node_id,
"before": before_nodes[node_id],
"after": after_nodes[node_id],
}
for node_id in changed_node_ids
],
"added_edges": [after_edges[key] for key in added_edge_keys],
"removed_edges": [before_edges[key] for key in removed_edge_keys],
}
def _edge_key(edge: dict[str, Any]) -> tuple[str, str, str]:
return (str(edge.get("from", "")), str(edge.get("to", "")), str(edge.get("type", "")))
def _stable_json(value: Any) -> str:
return json.dumps(value, sort_keys=True, separators=(",", ":"))
def _matches(needle: str, value: Any) -> bool:
return needle in _stable_json(value).lower()
def _required_text(payload: dict[str, Any], key: str, fallback_key: str | None = None) -> str:
value = payload.get(key)
if value is None and fallback_key is not None:

View File

@@ -41,10 +41,22 @@ class RegistryHandler(BaseHTTPRequestHandler):
return HTTPStatus.OK, {"status": "ok"}
if parts == ["repositories"]:
return HTTPStatus.OK, self.store.list_repositories()
if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "inventory":
return HTTPStatus.OK, self.store.repository_inventory(parts[1])
if len(parts) == 2 and parts[0] == "repositories":
return HTTPStatus.OK, self.store.get_repository(parts[1])
if len(parts) == 3 and parts[0] == "repositories" and parts[2] == "snapshots":
return HTTPStatus.OK, self.store.list_snapshots(parts[1])
if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "snapshots" and parts[3] == "latest":
return HTTPStatus.OK, self.store.latest_snapshot(parts[1])
if len(parts) == 4 and parts[0] == "repositories" and parts[2] == "snapshots" and parts[3] == "diff":
return HTTPStatus.OK, self.store.snapshot_diff(
parts[1],
from_id=_query_optional_int(query, "from_id"),
to_id=_query_optional_int(query, "to_id"),
)
if parts == ["search"]:
return HTTPStatus.OK, self.store.search(_query_one(query, "q"))
if parts == ["graph", "nodes"]:
return HTTPStatus.OK, self.store.combined_graph()["nodes"]
if len(parts) == 3 and parts[0] == "graph" and parts[1] == "nodes":
@@ -190,5 +202,12 @@ def _int_id(value: str, label: str) -> int:
raise RegistryError(f"invalid {label}: {value}", 400) from exc
def _query_optional_int(query: dict[str, list[str]], key: str) -> int | None:
value = _query_optional(query, key)
if value is None:
return None
return _int_id(value, key)
if __name__ == "__main__":
sys.exit(main())