from __future__ import annotations import json import hashlib import re import sqlite3 from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any from .canon import DISPLAY_ONLY_EDGE_TYPES, edge_canon_mapping, node_canon_mapping from .financial import ( financial_graph_errors, is_financial_graph_export, materialize_financial_graph_export, merge_financial_graph_exports, ) from .loader import repo_root from .schema_validation import draft202012_validator RESET_CONFIRMATION_TOKEN = "RESET-RAILIANCE-FABRIC-GRAPH-DATA" class RegistryError(Exception): def __init__(self, message: str, status_code: int = 400) -> None: super().__init__(message) self.message = message self.status_code = status_code @dataclass(frozen=True) class RegistryStore: path: Path def init_schema(self) -> None: if str(self.path) != ":memory:": self.path.parent.mkdir(parents=True, exist_ok=True) with self._connect() as db: db.executescript( """ create table if not exists repositories ( slug text primary key, name text not null, remote_url text, default_branch text, state_hub_repo_id text, created_at text not null, updated_at text not null ); create table if not exists snapshots ( id integer primary key autoincrement, repo_slug text not null references repositories(slug), commit_sha text not null, generated_at text not null, graph_json text not null, created_at text not null ); create index if not exists idx_snapshots_repo_latest on snapshots(repo_slug, id desc); create table if not exists discovery_snapshots ( id integer primary key autoincrement, repo_slug text not null references repositories(slug), commit_sha text not null, profile text not null, generated_at text not null, snapshot_json text not null, accepted_graph_snapshot_id integer references snapshots(id), created_at text not null ); create index if not exists idx_discovery_snapshots_repo_latest on discovery_snapshots(repo_slug, profile, id desc); create table if not exists artifacts ( id integer primary key autoincrement, repo_slug text not null references repositories(slug), target_id text, target_kind text, artifact_type text not null, name text not null, uri text not null, media_type text, digest text, version text, metadata_json text not null, created_at text not null ); create index if not exists idx_artifacts_repo on artifacts(repo_slug); create index if not exists idx_artifacts_target on artifacts(target_id); create table if not exists libraries ( id integer primary key autoincrement, repo_slug text not null references repositories(slug), bom_ref text, component_type text not null, name text not null, version text, purl text, scope text, licenses_json text not null, hashes_json text not null, metadata_json text not null, created_at text not null ); create index if not exists idx_libraries_repo on libraries(repo_slug); create index if not exists idx_libraries_purl on libraries(purl); create table if not exists registry_reset_events ( id integer primary key autoincrement, created_at text not null, reason text not null, archive_path text, archive_sha256 text not null, dropped_counts_json text not null ); """ ) def upsert_repository(self, payload: dict[str, Any]) -> dict[str, Any]: slug = _required_text(payload, "slug") now = _utc_now() name = str(payload.get("name") or slug) remote_url = _optional_text(payload, "remote_url") default_branch = str(payload.get("default_branch") or "main") state_hub_repo_id = _optional_text(payload, "state_hub_repo_id") with self._connect() as db: db.execute( """ insert into repositories ( slug, name, remote_url, default_branch, state_hub_repo_id, created_at, updated_at ) values (?, ?, ?, ?, ?, ?, ?) on conflict(slug) do update set name = excluded.name, remote_url = excluded.remote_url, default_branch = excluded.default_branch, state_hub_repo_id = excluded.state_hub_repo_id, updated_at = excluded.updated_at """, (slug, name, remote_url, default_branch, state_hub_repo_id, now, now), ) return self.get_repository(slug) def list_repositories(self) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( """ select slug, name, remote_url, default_branch, state_hub_repo_id, created_at, updated_at from repositories order by slug """ ).fetchall() return [_row_dict(row) for row in rows] def get_repository(self, slug: str) -> dict[str, Any]: with self._connect() as db: row = db.execute( """ select slug, name, remote_url, default_branch, state_hub_repo_id, created_at, updated_at from repositories where slug = ? """, (slug,), ).fetchone() if row is None: raise RegistryError(f"repository not found: {slug}", 404) return _row_dict(row) def add_snapshot(self, repo_slug: str, payload: dict[str, Any]) -> dict[str, Any]: self.get_repository(repo_slug) commit = _required_text(payload, "commit") generated_at = str(payload.get("generated_at") or _utc_now()) graph = payload.get("graph") if not isinstance(graph, dict): raise RegistryError("snapshot payload requires object field 'graph'") graph = _with_source(graph, repo_slug, commit, generated_at) graph = materialize_financial_graph_export(graph) validate_graph_export(graph) now = _utc_now() with self._connect() as db: cursor = db.execute( """ insert into snapshots (repo_slug, commit_sha, generated_at, graph_json, created_at) values (?, ?, ?, ?, ?) """, (repo_slug, commit, generated_at, json.dumps(graph, sort_keys=True), now), ) snapshot_id = int(cursor.lastrowid) return self.get_snapshot(snapshot_id) def get_snapshot(self, snapshot_id: int) -> dict[str, Any]: with self._connect() as db: row = db.execute( """ select id, repo_slug, commit_sha, generated_at, graph_json, created_at from snapshots where id = ? """, (snapshot_id,), ).fetchone() if row is None: raise RegistryError(f"snapshot not found: {snapshot_id}", 404) return _snapshot_dict(row) def list_snapshots(self, repo_slug: str) -> list[dict[str, Any]]: self.get_repository(repo_slug) with self._connect() as db: rows = db.execute( """ select id, repo_slug, commit_sha, generated_at, graph_json, created_at from snapshots where repo_slug = ? order by id desc """, (repo_slug,), ).fetchall() return [_snapshot_summary(row) for row in rows] def latest_snapshot(self, repo_slug: str) -> dict[str, Any]: self.get_repository(repo_slug) with self._connect() as db: row = db.execute( """ select id, repo_slug, commit_sha, generated_at, graph_json, created_at from snapshots where repo_slug = ? order by id desc limit 1 """, (repo_slug,), ).fetchone() if row is None: raise RegistryError(f"no snapshots for repository: {repo_slug}", 404) return _snapshot_dict(row) def latest_snapshots(self) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( """ select s.id, s.repo_slug, s.commit_sha, s.generated_at, s.graph_json, s.created_at from snapshots s join ( select repo_slug, max(id) as latest_id from snapshots group by repo_slug ) latest on latest.latest_id = s.id order by s.repo_slug """ ).fetchall() return [_snapshot_dict(row) for row in rows] def add_discovery_snapshot(self, repo_slug: str, payload: dict[str, Any]) -> dict[str, Any]: self.get_repository(repo_slug) if not isinstance(payload, dict): raise RegistryError("discovery snapshot payload must be an object") validate_discovery_snapshot(payload) source = payload.get("source") if isinstance(payload.get("source"), dict) else {} if source.get("repo_slug") and source.get("repo_slug") != repo_slug: raise RegistryError("discovery snapshot repo_slug does not match request path") commit = str(source.get("commit") or "working-tree") scan = payload.get("scan") if isinstance(payload.get("scan"), dict) else {} profile = str(scan.get("profile") or "deterministic") generated_at = str(payload.get("generated_at") or _utc_now()) now = _utc_now() with self._connect() as db: cursor = db.execute( """ insert into discovery_snapshots ( repo_slug, commit_sha, profile, generated_at, snapshot_json, accepted_graph_snapshot_id, created_at ) values (?, ?, ?, ?, ?, null, ?) """, (repo_slug, commit, profile, generated_at, json.dumps(payload, sort_keys=True), now), ) discovery_id = int(cursor.lastrowid) return self.get_discovery_snapshot(discovery_id) def get_discovery_snapshot(self, discovery_snapshot_id: int) -> dict[str, Any]: with self._connect() as db: row = db.execute( """ select id, repo_slug, commit_sha, profile, generated_at, snapshot_json, accepted_graph_snapshot_id, created_at from discovery_snapshots where id = ? """, (discovery_snapshot_id,), ).fetchone() if row is None: raise RegistryError(f"discovery snapshot not found: {discovery_snapshot_id}", 404) return _discovery_snapshot_dict(row) def list_discovery_snapshots(self, repo_slug: str, profile: str | None = None) -> list[dict[str, Any]]: self.get_repository(repo_slug) params: list[Any] = [repo_slug] where = "where repo_slug = ?" if profile: where += " and profile = ?" params.append(profile) with self._connect() as db: rows = db.execute( f""" select id, repo_slug, commit_sha, profile, generated_at, snapshot_json, accepted_graph_snapshot_id, created_at from discovery_snapshots {where} order by id desc """, params, ).fetchall() return [_discovery_snapshot_summary(row) for row in rows] def latest_discovery_snapshot(self, repo_slug: str, profile: str | None = None) -> dict[str, Any]: self.get_repository(repo_slug) params: list[Any] = [repo_slug] where = "where repo_slug = ?" if profile: where += " and profile = ?" params.append(profile) with self._connect() as db: row = db.execute( f""" select id, repo_slug, commit_sha, profile, generated_at, snapshot_json, accepted_graph_snapshot_id, created_at from discovery_snapshots {where} order by id desc limit 1 """, params, ).fetchone() if row is None: raise RegistryError(f"no discovery snapshots for repository: {repo_slug}", 404) return _discovery_snapshot_dict(row) def discovery_snapshot_diff( self, repo_slug: str, from_id: int | None = None, to_id: int | None = None, profile: str | None = None, ) -> dict[str, Any]: self.get_repository(repo_slug) if from_id is None or to_id is None: snapshots = self.list_discovery_snapshots(repo_slug, profile=profile) if len(snapshots) < 2: raise RegistryError(f"at least two discovery snapshots are required for diff: {repo_slug}", 404) to_id = snapshots[0]["id"] if to_id is None else to_id from_id = snapshots[1]["id"] if from_id is None else from_id before = self.get_discovery_snapshot(from_id) after = self.get_discovery_snapshot(to_id) if before["repo_slug"] != repo_slug or after["repo_slug"] != repo_slug: raise RegistryError("discovery snapshot ids must belong to the requested repository") return { "repo_slug": repo_slug, "from": _discovery_snapshot_public_summary(before), "to": _discovery_snapshot_public_summary(after), "discovery": _discovery_diff(before["snapshot"], after["snapshot"]), "reconciliation": after["snapshot"].get("reconciliation", {}), } def accept_discovery_snapshot( self, repo_slug: str, discovery_snapshot_id: int, payload: dict[str, Any] | None = None, ) -> dict[str, Any]: payload = payload or {} discovery = self.get_discovery_snapshot(discovery_snapshot_id) if discovery["repo_slug"] != repo_slug: raise RegistryError("discovery snapshot id must belong to the requested repository") base_graph = self._latest_graph_or_empty(repo_slug) accepted_keys = payload.get("accepted_keys") if accepted_keys is not None and not isinstance(accepted_keys, list): raise RegistryError("field 'accepted_keys' must be an array when provided") accepted_key_set = {str(key) for key in accepted_keys or []} accept_review_states = payload.get("accept_review_states", ["accepted"]) if not isinstance(accept_review_states, list): raise RegistryError("field 'accept_review_states' must be an array") graph = _project_discovery_snapshot( base_graph, discovery["snapshot"], accepted_keys=accepted_key_set, accept_review_states={str(state) for state in accept_review_states}, ) snapshot = self.add_snapshot( repo_slug, { "commit": str(payload.get("commit") or f"discovery:{discovery['commit']}"), "generated_at": _utc_now(), "graph": graph, }, ) with self._connect() as db: db.execute( "update discovery_snapshots set accepted_graph_snapshot_id = ? where id = ?", (snapshot["id"], discovery_snapshot_id), ) return { "repo_slug": repo_slug, "discovery_snapshot": self.get_discovery_snapshot(discovery_snapshot_id), "graph_snapshot": snapshot, "projected": { "node_count": len(graph.get("nodes", [])), "edge_count": len(graph.get("edges", [])), }, } def combined_graph(self) -> dict[str, Any]: snapshots = self.latest_snapshots() if snapshots and all(is_financial_graph_export(snapshot["graph"]) for snapshot in snapshots): return merge_financial_graph_exports( [snapshot["graph"] for snapshot in snapshots], generated_at=_utc_now(), ) nodes: dict[str, dict[str, Any]] = {} edges: list[dict[str, str]] = [] for snapshot in snapshots: graph = snapshot["graph"] for node in graph.get("nodes", []): if isinstance(node, dict): nodes[str(node.get("id", ""))] = node for edge in graph.get("edges", []): if isinstance(edge, dict): edges.append(_edge_with_canon_metadata(edge)) return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "FabricGraphExport", "generated_at": _utc_now(), "source": {"repo": "registry", "commit": "", "path": ""}, "nodes": [nodes[key] for key in sorted(nodes)], "edges": sorted(edges, key=lambda edge: (edge["from"], edge["to"], edge["type"])), } def add_artifact(self, payload: dict[str, Any]) -> dict[str, Any]: repo_slug = _required_text(payload, "repo_slug") self.get_repository(repo_slug) artifact_type = _required_text(payload, "artifact_type", fallback_key="type") uri = _required_text(payload, "uri") name = str(payload.get("name") or uri) target_id = _optional_text(payload, "target_id") target_kind = _optional_text(payload, "target_kind") media_type = _optional_text(payload, "media_type") digest = _optional_text(payload, "digest") version = _optional_text(payload, "version") metadata = payload.get("metadata", {}) if not isinstance(metadata, dict): raise RegistryError("field 'metadata' must be an object") now = _utc_now() with self._connect() as db: cursor = db.execute( """ insert into artifacts ( repo_slug, target_id, target_kind, artifact_type, name, uri, media_type, digest, version, metadata_json, created_at ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( repo_slug, target_id, target_kind, artifact_type, name, uri, media_type, digest, version, json.dumps(metadata, sort_keys=True), now, ), ) artifact_id = int(cursor.lastrowid) return self.get_artifact(artifact_id) def list_artifacts( self, repo_slug: str | None = None, target_id: str | None = None, artifact_type: str | None = None, ) -> list[dict[str, Any]]: conditions: list[str] = [] params: list[str] = [] if repo_slug: conditions.append("repo_slug = ?") params.append(repo_slug) if target_id: conditions.append("target_id = ?") params.append(target_id) if artifact_type: conditions.append("artifact_type = ?") params.append(artifact_type) where = f" where {' and '.join(conditions)}" if conditions else "" with self._connect() as db: rows = db.execute( """ select id, repo_slug, target_id, target_kind, artifact_type, name, uri, media_type, digest, version, metadata_json, created_at from artifacts """ + where + " order by repo_slug, target_id, artifact_type, id", params, ).fetchall() return [_artifact_dict(row) for row in rows] def get_artifact(self, artifact_id: int) -> dict[str, Any]: with self._connect() as db: row = db.execute( """ select id, repo_slug, target_id, target_kind, artifact_type, name, uri, media_type, digest, version, metadata_json, created_at from artifacts where id = ? """, (artifact_id,), ).fetchone() if row is None: raise RegistryError(f"artifact not found: {artifact_id}", 404) return _artifact_dict(row) def graph_node_detail(self, graph_id: str) -> dict[str, Any]: node = graph_node(self.combined_graph(), graph_id) enriched = json.loads(json.dumps(node)) enriched["artifacts"] = self.list_artifacts(target_id=graph_id) return enriched def repository_inventory(self, repo_slug: str) -> dict[str, Any]: repository = self.get_repository(repo_slug) try: latest_snapshot = self.latest_snapshot(repo_slug) except RegistryError as exc: if exc.status_code != 404: raise latest_snapshot = None try: latest_discovery_snapshot = self.latest_discovery_snapshot(repo_slug) except RegistryError as exc: if exc.status_code != 404: raise latest_discovery_snapshot = None graph = latest_snapshot["graph"] if latest_snapshot else _empty_graph() nodes = [node for node in graph.get("nodes", []) if isinstance(node, dict)] edges = [edge for edge in graph.get("edges", []) if isinstance(edge, dict)] artifacts = self.list_artifacts(repo_slug=repo_slug) libraries = self.list_libraries(repo_slug=repo_slug) discovery_snapshots = self.list_discovery_snapshots(repo_slug) return { "repository": repository, "latest_snapshot": _snapshot_public_summary(latest_snapshot) if latest_snapshot else None, "latest_discovery_snapshot": ( _discovery_snapshot_public_summary(latest_discovery_snapshot) if latest_discovery_snapshot else None ), "discovery_health": ( _discovery_snapshot_health(latest_discovery_snapshot) if latest_discovery_snapshot else {"health": "unknown", "review_required": False} ), "counts": { "snapshots": len(self.list_snapshots(repo_slug)), "discovery_snapshots": len(discovery_snapshots), "nodes": len(nodes), "edges": len(edges), "artifacts": len(artifacts), "libraries": len(libraries), }, "graph": { "nodes": nodes, "edges": edges, "owner_repos": sorted({str(node.get("repo", "")) for node in nodes if node.get("repo")}), }, "artifacts": artifacts, "libraries": libraries, } def ingest_cyclonedx(self, repo_slug: str, payload: dict[str, Any]) -> dict[str, Any]: self.get_repository(repo_slug) bom = payload.get("bom") if "bom" in payload else payload if not isinstance(bom, dict): raise RegistryError("CycloneDX payload must be an object") if bom.get("bomFormat") and bom.get("bomFormat") != "CycloneDX": raise RegistryError("CycloneDX payload must have bomFormat 'CycloneDX'") entries = _cyclonedx_entries(bom) now = _utc_now() with self._connect() as db: db.execute("delete from libraries where repo_slug = ?", (repo_slug,)) for entry in entries: db.execute( """ insert into libraries ( repo_slug, bom_ref, component_type, name, version, purl, scope, licenses_json, hashes_json, metadata_json, created_at ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( repo_slug, entry["bom_ref"], entry["component_type"], entry["name"], entry["version"], entry["purl"], entry["scope"], json.dumps(entry["licenses"], sort_keys=True), json.dumps(entry["hashes"], sort_keys=True), json.dumps(entry["metadata"], sort_keys=True), now, ), ) return { "repo_slug": repo_slug, "component_count": len(entries), "libraries": self.list_libraries(repo_slug=repo_slug), } def list_libraries( self, repo_slug: str | None = None, name: str | None = None, purl: str | None = None, component_type: str | None = None, ) -> list[dict[str, Any]]: conditions: list[str] = [] params: list[str] = [] if repo_slug: conditions.append("repo_slug = ?") params.append(repo_slug) if name: conditions.append("name = ?") params.append(name) if purl: conditions.append("purl = ?") params.append(purl) if component_type: conditions.append("component_type = ?") params.append(component_type) where = f" where {' and '.join(conditions)}" if conditions else "" with self._connect() as db: rows = db.execute( """ select id, repo_slug, bom_ref, component_type, name, version, purl, scope, licenses_json, hashes_json, metadata_json, created_at from libraries """ + where + " order by repo_slug, name, version, id", params, ).fetchall() return [_library_dict(row) for row in rows] def get_library(self, library_id: int) -> dict[str, Any]: with self._connect() as db: row = db.execute( """ select id, repo_slug, bom_ref, component_type, name, version, purl, scope, licenses_json, hashes_json, metadata_json, created_at from libraries where id = ? """, (library_id,), ).fetchone() if row is None: raise RegistryError(f"library not found: {library_id}", 404) return _library_dict(row) def snapshot_diff( self, repo_slug: str, from_id: int | None = None, to_id: int | None = None, ) -> dict[str, Any]: self.get_repository(repo_slug) if from_id is None or to_id is None: snapshots = self.list_snapshots(repo_slug) if len(snapshots) < 2: raise RegistryError(f"at least two snapshots are required for diff: {repo_slug}", 404) to_id = snapshots[0]["id"] if to_id is None else to_id from_id = snapshots[1]["id"] if from_id is None else from_id before = self.get_snapshot(from_id) after = self.get_snapshot(to_id) if before["repo_slug"] != repo_slug or after["repo_slug"] != repo_slug: raise RegistryError("snapshot ids must belong to the requested repository") return { "repo_slug": repo_slug, "from": _snapshot_public_summary(before), "to": _snapshot_public_summary(after), "graph": _graph_diff(before["graph"], after["graph"]), } def search(self, query: str) -> dict[str, Any]: needle = query.strip().lower() if not needle: raise RegistryError("query parameter 'q' is required") graph = self.combined_graph() return { "query": query, "nodes": [ node for node in graph.get("nodes", []) if isinstance(node, dict) and _matches(needle, node) ], "artifacts": [ artifact for artifact in self.list_artifacts() if _matches(needle, artifact) ], "libraries": [ library for library in self.list_libraries() if _matches(needle, library) ], "repositories": [ repository for repository in self.list_repositories() if _matches(needle, repository) ], } def status(self) -> dict[str, Any]: with self._connect() as db: counts = { "repositories": db.execute("select count(*) from repositories").fetchone()[0], "snapshots": db.execute("select count(*) from snapshots").fetchone()[0], "discovery_snapshots": db.execute("select count(*) from discovery_snapshots").fetchone()[0], "artifacts": db.execute("select count(*) from artifacts").fetchone()[0], "libraries": db.execute("select count(*) from libraries").fetchone()[0], "registry_reset_events": db.execute("select count(*) from registry_reset_events").fetchone()[0], } latest = [ { "repo_slug": snapshot["repo_slug"], "snapshot_id": snapshot["id"], "commit": snapshot["commit"], "generated_at": snapshot["generated_at"], } for snapshot in self.latest_snapshots() ] latest_discovery = [ _discovery_snapshot_public_summary(snapshot) for snapshot in self.latest_discovery_snapshots() ] return { "status": "ok", "database": str(self.path), "counts": counts, "latest_snapshots": latest, "latest_discovery_snapshots": latest_discovery, } def reset_archive(self) -> dict[str, Any]: with self._connect() as db: snapshot_rows = db.execute( """ select id, repo_slug, commit_sha, generated_at, graph_json, created_at from snapshots order by repo_slug, id """ ).fetchall() discovery_rows = db.execute( """ select id, repo_slug, commit_sha, profile, generated_at, snapshot_json, accepted_graph_snapshot_id, created_at from discovery_snapshots order by repo_slug, profile, id """ ).fetchall() artifact_rows = db.execute( """ select id, repo_slug, target_id, target_kind, artifact_type, name, uri, media_type, digest, version, metadata_json, created_at from artifacts order by repo_slug, id """ ).fetchall() library_rows = db.execute( """ select id, repo_slug, bom_ref, component_type, name, version, purl, scope, licenses_json, hashes_json, metadata_json, created_at from libraries order by repo_slug, id """ ).fetchall() reset_rows = db.execute( """ select id, created_at, reason, archive_path, archive_sha256, dropped_counts_json from registry_reset_events order by id """ ).fetchall() return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "RegistryResetArchive", "generated_at": _utc_now(), "source": {"database": str(self.path)}, "counts": self.status()["counts"], "combined_graph": self.combined_graph(), "repositories": self.list_repositories(), "snapshots": [_snapshot_dict(row) for row in snapshot_rows], "discovery_snapshots": [_discovery_snapshot_dict(row) for row in discovery_rows], "artifacts": [_artifact_dict(row) for row in artifact_rows], "libraries": [_library_dict(row) for row in library_rows], "reset_events": [_reset_event_dict(row) for row in reset_rows], "rollback": { "limits": ( "This archive is a JSON evidence bundle, not an automatic SQLite restore. " "Use it to inspect and manually reinsert prior registry graph data if needed." ), "post_reset_source_of_truth": ( "Repository registrations remain in the registry. Graph snapshots, discovery " "snapshots, artifacts, and library inventory must be recreated by reingesting " "registered/local repositories with the canon-aligned scanner and graph model." ), }, } def reset_graph_data(self, payload: dict[str, Any]) -> dict[str, Any]: confirm = _required_text(payload, "confirm") if confirm != RESET_CONFIRMATION_TOKEN: raise RegistryError( f"reset requires confirm={RESET_CONFIRMATION_TOKEN!r}", 400, ) reason = _required_text(payload, "reason") archive_sha256 = _required_text(payload, "archive_sha256") archive_path = _optional_text(payload, "archive_path") now = _utc_now() with self._connect() as db: counts = _resettable_counts(db) cursor = db.execute( """ insert into registry_reset_events ( created_at, reason, archive_path, archive_sha256, dropped_counts_json ) values (?, ?, ?, ?, ?) """, (now, reason, archive_path, archive_sha256, json.dumps(counts, sort_keys=True)), ) event_id = int(cursor.lastrowid) db.execute("delete from discovery_snapshots") db.execute("delete from snapshots") db.execute("delete from artifacts") db.execute("delete from libraries") event = self.get_reset_event(event_id) return { **event, "confirm": confirm, "repositories_preserved": len(self.list_repositories()), } def get_reset_event(self, event_id: int) -> dict[str, Any]: with self._connect() as db: row = db.execute( """ select id, created_at, reason, archive_path, archive_sha256, dropped_counts_json from registry_reset_events where id = ? """, (event_id,), ).fetchone() if row is None: raise RegistryError(f"reset event not found: {event_id}", 404) return _reset_event_dict(row) def latest_discovery_snapshots(self, profile: str | None = None) -> list[dict[str, Any]]: params: list[Any] = [] where = "" if profile: where = "where profile = ?" params.append(profile) with self._connect() as db: rows = db.execute( f""" select ds.id, ds.repo_slug, ds.commit_sha, ds.profile, ds.generated_at, ds.snapshot_json, ds.accepted_graph_snapshot_id, ds.created_at from discovery_snapshots ds join ( select repo_slug, profile, max(id) as latest_id from discovery_snapshots {where} group by repo_slug, profile ) latest on latest.latest_id = ds.id order by ds.repo_slug, ds.profile """, params, ).fetchall() return [_discovery_snapshot_dict(row) for row in rows] def _latest_graph_or_empty(self, repo_slug: str) -> dict[str, Any]: try: return self.latest_snapshot(repo_slug)["graph"] except RegistryError as exc: if exc.status_code != 404: raise return _empty_graph() def _connect(self) -> sqlite3.Connection: db = sqlite3.connect(self.path) db.row_factory = sqlite3.Row return db def validate_graph_export(graph: dict[str, Any]) -> None: graph = materialize_financial_graph_export(graph) if is_financial_graph_export(graph): errors = financial_graph_errors(graph) if errors: raise RegistryError(f"invalid FinancialFabricGraphExport: {errors[0]}") canon_errors = _graph_canon_metadata_errors(graph) if canon_errors: raise RegistryError(f"invalid FinancialFabricGraphExport canon metadata: {canon_errors[0]}") return schema_path = repo_root() / "schemas" / "state-hub-export.schema.yaml" validator = draft202012_validator(schema_path) errors = sorted(validator.iter_errors(graph), key=lambda error: list(error.path)) if errors: error = errors[0] location = ".".join(str(part) for part in error.path) or "" raise RegistryError(f"invalid FabricGraphExport at {location}: {error.message}") canon_errors = _graph_canon_metadata_errors(graph) if canon_errors: raise RegistryError(f"invalid FabricGraphExport canon metadata: {canon_errors[0]}") def validate_discovery_snapshot(snapshot: dict[str, Any]) -> None: schema_path = repo_root() / "schemas" / "discovery-snapshot.schema.yaml" validator = draft202012_validator(schema_path) errors = sorted(validator.iter_errors(snapshot), key=lambda error: list(error.path)) if errors: error = errors[0] location = ".".join(str(part) for part in error.path) or "" raise RegistryError(f"invalid FabricDiscoverySnapshot at {location}: {error.message}") canon_errors = _discovery_canon_metadata_errors(snapshot) if canon_errors: raise RegistryError(f"invalid FabricDiscoverySnapshot canon metadata: {canon_errors[0]}") def _graph_canon_metadata_errors(graph: dict[str, Any]) -> list[str]: errors: list[str] = [] for index, node in enumerate(graph.get("nodes", [])): if not isinstance(node, dict): continue if _has_any(node, ("canon_category", "canon_anchor", "mapping_fit", "evidence_state")): _require_fields( errors, f"nodes[{index}]", node, ("canon_category", "mapping_fit", "evidence_state"), ) for index, edge in enumerate(graph.get("edges", [])): if not isinstance(edge, dict): continue _validate_edge_canon_metadata(errors, f"edges[{index}]", edge, type_field="type") return errors def _discovery_canon_metadata_errors(snapshot: dict[str, Any]) -> list[str]: errors: list[str] = [] candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {} for index, node in enumerate(candidates.get("nodes", [])): if not isinstance(node, dict): continue _require_fields( errors, f"candidates.nodes[{index}]", node, ("canon_category", "mapping_fit", "evidence_state"), ) for index, edge in enumerate(candidates.get("edges", [])): if not isinstance(edge, dict): continue _require_fields( errors, f"candidates.edges[{index}]", edge, ("mapping_fit", "display_only", "evidence_state"), ) _validate_edge_canon_metadata(errors, f"candidates.edges[{index}]", edge, type_field="edge_type") return errors def _validate_edge_canon_metadata( errors: list[str], path: str, edge: dict[str, Any], *, type_field: str, ) -> None: edge_type = str(edge.get(type_field) or "") has_canon_fields = _has_any( edge, ("canonical_type", "canon_anchor", "mapping_fit", "display_only", "evidence_state"), ) if has_canon_fields: _require_fields(errors, path, edge, ("mapping_fit", "display_only", "evidence_state")) if edge_type in DISPLAY_ONLY_EDGE_TYPES and edge.get("display_only") is not True: errors.append(f"{path} uses display-only edge type {edge_type!r} without display_only=true") if edge.get("display_only") is True and edge_type and not has_canon_fields: errors.append(f"{path} is display-only but lacks canon metadata") def _has_any(item: dict[str, Any], fields: tuple[str, ...]) -> bool: return any(field in item for field in fields) def _require_fields( errors: list[str], path: str, item: dict[str, Any], fields: tuple[str, ...], ) -> None: for field in fields: if field not in item or item.get(field) in (None, ""): errors.append(f"{path} missing required canon metadata field {field!r}") def providers(graph: dict[str, Any], capability: str) -> list[dict[str, Any]]: result = [] for node in _nodes(graph): attrs = _attrs(node) if node.get("kind") != "CapabilityDeclaration": continue if node.get("id") == capability or attrs.get("capability_type") == capability: result.append( { "provider_id": node.get("id", ""), "name": node.get("name", ""), "service_id": attrs.get("service_id", ""), "capability_type": attrs.get("capability_type", ""), "lifecycle": node.get("lifecycle", ""), "interfaces": attrs.get("interface_ids", []), "repo": node.get("repo", ""), "domain": node.get("domain", ""), } ) return sorted(result, key=lambda item: item["provider_id"]) def consumers(graph: dict[str, Any], target: str) -> list[dict[str, Any]]: nodes = _nodes_by_id(graph) target_interface_type = "" target_node = nodes.get(target) if target_node and target_node.get("kind") == "InterfaceDeclaration": target_interface_type = str(_attrs(target_node).get("interface_type", "")) result: list[dict[str, Any]] = [] bindings_by_dependency = _bindings_by_dependency(graph) for dependency in _dependency_nodes(graph): attrs = _attrs(dependency) dependency_id = str(dependency.get("id", "")) dependency_matches = target in { dependency_id, str(attrs.get("requires_capability_id", "")), str(attrs.get("requires_capability_type", "")), str(attrs.get("interface_type", "")), } or bool(target_interface_type and target_interface_type == attrs.get("interface_type")) bindings = bindings_by_dependency.get(dependency_id, []) matching_bindings = [ binding for binding in bindings if target in {binding["provider_capability_id"], binding["provider_interface_id"]} ] if dependency_matches and not bindings: result.append(_consumer_match(attrs, dependency_id, {})) for binding in bindings: if dependency_matches or binding in matching_bindings: result.append(_consumer_match(attrs, dependency_id, binding)) return sorted(result, key=lambda item: (item["consumer_service_id"], item["dependency_id"])) def unresolved_dependencies(graph: dict[str, Any]) -> list[dict[str, Any]]: result = [] bindings_by_dependency = _bindings_by_dependency(graph) for dependency in _dependency_nodes(graph): attrs = _attrs(dependency) dependency_id = str(dependency.get("id", "")) required_id = str(attrs.get("requires_capability_id", "")) required_type = str(attrs.get("requires_capability_type", "")) provider_matches = providers(graph, required_id or required_type) bindings = bindings_by_dependency.get(dependency_id, []) has_missing_binding = any(binding.get("status") in {"missing", "disputed"} for binding in bindings) if not provider_matches or has_missing_binding: result.append( { "dependency_id": dependency_id, "consumer_service_id": attrs.get("consumer_service_id", ""), "requires_capability_id": required_id, "requires_capability_type": required_type, "interface_type": attrs.get("interface_type", ""), "reason": "missing_provider" if not provider_matches else "binding_not_exact", } ) return sorted(result, key=lambda item: item["dependency_id"]) def blast_radius(graph: dict[str, Any], interface: str) -> list[dict[str, Any]]: target_node = _nodes_by_id(graph).get(interface) matches = consumers(graph, interface) if target_node and target_node.get("kind") == "InterfaceDeclaration": return [match for match in matches if match.get("provider_interface_id") == interface] return [ match for match in matches if _dependency_attrs(graph, match["dependency_id"]).get("interface_type") == interface ] def dependency_path_lines(graph: dict[str, Any], service_id: str) -> list[str]: nodes = _nodes_by_id(graph) if service_id not in nodes or nodes[service_id].get("kind") != "ServiceDeclaration": return [f"unknown service: {service_id}"] deps_by_consumer: dict[str, list[dict[str, Any]]] = {} for dependency in _dependency_nodes(graph): attrs = _attrs(dependency) deps_by_consumer.setdefault(str(attrs.get("consumer_service_id", "")), []).append(dependency) capability_service = { str(node.get("id", "")): str(_attrs(node).get("service_id", "")) for node in _nodes(graph) if node.get("kind") == "CapabilityDeclaration" } bindings_by_dependency = _bindings_by_dependency(graph) lines: list[str] = [] def walk(current: str, indent: int, stack: list[str]) -> None: prefix = " " * indent if current in stack: lines.append(f"{prefix}{current} (cycle)") return lines.append(f"{prefix}{current}") dependencies = sorted(deps_by_consumer.get(current, []), key=lambda item: str(item.get("id", ""))) if not dependencies: lines.append(f"{prefix} no declared dependencies") return for dependency in dependencies: dependency_id = str(dependency.get("id", "")) attrs = _attrs(dependency) required = attrs.get("requires_capability_type", "") lines.append(f"{prefix} requires {required}: {dependency_id}") bindings = bindings_by_dependency.get(dependency_id, []) if not bindings: candidate_providers = providers(graph, str(required)) if candidate_providers: for provider in candidate_providers: lines.append(f"{prefix} candidate {provider['provider_id']}") else: lines.append(f"{prefix} unresolved") continue for binding in bindings: provider_id = binding.get("provider_capability_id", "") provider_service = capability_service.get(provider_id, "") status = binding.get("status", "") lines.append(f"{prefix} {status} -> {provider_id}") if provider_service and provider_service != current: walk(provider_service, indent + 3, stack + [current]) walk(service_id, 0, []) return lines def graph_node(graph: dict[str, Any], graph_id: str) -> dict[str, Any]: node = _nodes_by_id(graph).get(graph_id) if node is None: raise RegistryError(f"graph node not found: {graph_id}", 404) return node def backstage_projection(graph: dict[str, Any]) -> dict[str, Any]: items: list[dict[str, Any]] = [] domains = sorted({str(node.get("domain", "")) for node in _nodes(graph) if node.get("domain")}) for domain in domains: items.append( { "apiVersion": "backstage.io/v1alpha1", "kind": "Domain", "metadata": {"name": _backstage_name(domain), "title": domain}, "spec": {"owner": _backstage_name(domain)}, } ) items.append( { "apiVersion": "backstage.io/v1alpha1", "kind": "System", "metadata": {"name": _backstage_name(domain), "title": domain}, "spec": {"owner": _backstage_name(domain), "domain": _backstage_name(domain)}, } ) for node in _nodes(graph): kind = str(node.get("kind", "")) if kind == "ServiceDeclaration": attrs = _attrs(node) items.append( { "apiVersion": "backstage.io/v1alpha1", "kind": "Component", "metadata": _backstage_metadata(node), "spec": { "type": "service", "lifecycle": _backstage_lifecycle(str(node.get("lifecycle", ""))), "owner": _backstage_owner(node), "system": _backstage_name(str(node.get("domain", ""))), "providesApis": [_backstage_name(value) for value in attrs.get("exposes_interfaces", [])], }, } ) elif kind == "InterfaceDeclaration": attrs = _attrs(node) items.append( { "apiVersion": "backstage.io/v1alpha1", "kind": "API", "metadata": _backstage_metadata(node), "spec": { "type": _backstage_api_type(str(attrs.get("interface_type", ""))), "lifecycle": _backstage_lifecycle(str(node.get("lifecycle", ""))), "owner": _backstage_owner(node), "system": _backstage_name(str(node.get("domain", ""))), "definition": "", }, } ) elif kind == "CapabilityDeclaration": attrs = _attrs(node) items.append( { "apiVersion": "backstage.io/v1alpha1", "kind": "Resource", "metadata": _backstage_metadata(node), "spec": { "type": attrs.get("capability_type", "capability"), "owner": _backstage_owner(node), "system": _backstage_name(str(node.get("domain", ""))), }, } ) return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "BackstageCatalogProjection", "items": items, } def xregistry_projection(graph: dict[str, Any]) -> dict[str, Any]: groups = { "services": _xregistry_group("service", "services"), "capabilities": _xregistry_group("capability", "capabilities"), "interfaces": _xregistry_group("interface", "interfaces"), "dependencies": _xregistry_group("dependency", "dependencies"), "bindings": _xregistry_group("binding", "bindings"), } group_by_kind = { "ServiceDeclaration": "services", "CapabilityDeclaration": "capabilities", "InterfaceDeclaration": "interfaces", "DependencyDeclaration": "dependencies", "BindingAssertion": "bindings", } for node in _nodes(graph): group_name = group_by_kind.get(str(node.get("kind", ""))) if group_name is None: continue node_id = str(node.get("id", "")) groups[group_name]["resources"][_xregistry_key(node_id)] = { "id": node_id, "name": node.get("name", node_id), "versionid": "latest", "attributes": { "kind": node.get("kind", ""), "repo": node.get("repo", ""), "domain": node.get("domain", ""), "lifecycle": node.get("lifecycle", ""), **_attrs(node), }, } return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "XRegistryProjection", "groups": groups, } def library_xregistry_projection(libraries: list[dict[str, Any]]) -> dict[str, Any]: group = _xregistry_group("library", "libraries") for library in libraries: key = _xregistry_key(str(library.get("purl") or library.get("bom_ref") or library.get("name", ""))) group["resources"][key] = { "id": library.get("purl") or library.get("bom_ref") or library.get("name", ""), "name": library.get("name", ""), "versionid": library.get("version") or "unknown", "attributes": { "repo": library.get("repo_slug", ""), "bom_ref": library.get("bom_ref", ""), "component_type": library.get("component_type", ""), "scope": library.get("scope", ""), "licenses": library.get("licenses", []), "hashes": library.get("hashes", []), }, } return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "LibraryXRegistryProjection", "groups": {"libraries": group}, } def _with_source(graph: dict[str, Any], repo_slug: str, commit: str, generated_at: str) -> dict[str, Any]: copy = json.loads(json.dumps(graph)) copy.setdefault("generated_at", generated_at) copy.setdefault("source", {}) copy["source"].setdefault("repo", repo_slug) copy["source"].setdefault("commit", commit) copy["source"].setdefault("path", "") return copy def _snapshot_dict(row: sqlite3.Row) -> dict[str, Any]: return { "id": row["id"], "repo_slug": row["repo_slug"], "commit": row["commit_sha"], "generated_at": row["generated_at"], "graph": json.loads(row["graph_json"]), "created_at": row["created_at"], } def _snapshot_summary(row: sqlite3.Row) -> dict[str, Any]: graph = json.loads(row["graph_json"]) return { "id": row["id"], "repo_slug": row["repo_slug"], "commit": row["commit_sha"], "generated_at": row["generated_at"], "created_at": row["created_at"], "node_count": len(graph.get("nodes", [])), "edge_count": len(graph.get("edges", [])), } def _snapshot_public_summary(snapshot: dict[str, Any]) -> dict[str, Any]: graph = snapshot["graph"] return { "id": snapshot["id"], "repo_slug": snapshot["repo_slug"], "commit": snapshot["commit"], "generated_at": snapshot["generated_at"], "created_at": snapshot["created_at"], "node_count": len(graph.get("nodes", [])), "edge_count": len(graph.get("edges", [])), } def _discovery_snapshot_dict(row: sqlite3.Row) -> dict[str, Any]: snapshot = json.loads(row["snapshot_json"]) return { "id": row["id"], "repo_slug": row["repo_slug"], "commit": row["commit_sha"], "profile": row["profile"], "generated_at": row["generated_at"], "snapshot": snapshot, "accepted_graph_snapshot_id": row["accepted_graph_snapshot_id"], "created_at": row["created_at"], } def _discovery_snapshot_summary(row: sqlite3.Row) -> dict[str, Any]: snapshot = json.loads(row["snapshot_json"]) candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {} reconciliation = snapshot.get("reconciliation") if isinstance(snapshot.get("reconciliation"), dict) else {} diff = reconciliation.get("diff") if isinstance(reconciliation.get("diff"), dict) else {} return { "id": row["id"], "repo_slug": row["repo_slug"], "commit": row["commit_sha"], "profile": row["profile"], "generated_at": row["generated_at"], "accepted_graph_snapshot_id": row["accepted_graph_snapshot_id"], "created_at": row["created_at"], "candidate_counts": { "nodes": len(candidates.get("nodes", [])) if isinstance(candidates.get("nodes"), list) else 0, "edges": len(candidates.get("edges", [])) if isinstance(candidates.get("edges"), list) else 0, "attributes": len(candidates.get("attributes", [])) if isinstance(candidates.get("attributes"), list) else 0, }, "diff_counts": { "added": len(diff.get("added", [])) if isinstance(diff.get("added"), list) else 0, "changed": len(diff.get("changed", [])) if isinstance(diff.get("changed"), list) else 0, "retired": len(diff.get("retired", [])) if isinstance(diff.get("retired"), list) else 0, "conflicted": len(diff.get("conflicted", [])) if isinstance(diff.get("conflicted"), list) else 0, }, "review_artifact_count": len(snapshot.get("review_artifacts", [])) if isinstance(snapshot.get("review_artifacts"), list) else 0, "connector_run_count": len(snapshot.get("connector_runs", [])) if isinstance(snapshot.get("connector_runs"), list) else 0, } def _discovery_snapshot_public_summary(snapshot: dict[str, Any]) -> dict[str, Any]: candidates = snapshot["snapshot"].get("candidates", {}) reconciliation = snapshot["snapshot"].get("reconciliation", {}) diff = reconciliation.get("diff", {}) if isinstance(reconciliation, dict) else {} result = { "id": snapshot["id"], "repo_slug": snapshot["repo_slug"], "commit": snapshot["commit"], "profile": snapshot["profile"], "generated_at": snapshot["generated_at"], "accepted_graph_snapshot_id": snapshot["accepted_graph_snapshot_id"], "created_at": snapshot["created_at"], "candidate_counts": { "nodes": len(candidates.get("nodes", [])) if isinstance(candidates.get("nodes"), list) else 0, "edges": len(candidates.get("edges", [])) if isinstance(candidates.get("edges"), list) else 0, "attributes": len(candidates.get("attributes", [])) if isinstance(candidates.get("attributes"), list) else 0, }, "diff_counts": { "added": len(diff.get("added", [])) if isinstance(diff.get("added"), list) else 0, "changed": len(diff.get("changed", [])) if isinstance(diff.get("changed"), list) else 0, "retired": len(diff.get("retired", [])) if isinstance(diff.get("retired"), list) else 0, "conflicted": len(diff.get("conflicted", [])) if isinstance(diff.get("conflicted"), list) else 0, }, } result.update(_discovery_snapshot_health(snapshot)) return result def _discovery_snapshot_health(snapshot: dict[str, Any]) -> dict[str, Any]: payload = snapshot.get("snapshot") if isinstance(snapshot.get("snapshot"), dict) else {} reconciliation = payload.get("reconciliation") if isinstance(payload.get("reconciliation"), dict) else {} diff = reconciliation.get("diff") if isinstance(reconciliation.get("diff"), dict) else {} diff_counts = { "added": len(diff.get("added", [])) if isinstance(diff.get("added"), list) else 0, "changed": len(diff.get("changed", [])) if isinstance(diff.get("changed"), list) else 0, "retired": len(diff.get("retired", [])) if isinstance(diff.get("retired"), list) else 0, "conflicted": len(diff.get("conflicted", [])) if isinstance(diff.get("conflicted"), list) else 0, } review_artifact_count = ( len(payload.get("review_artifacts", [])) if isinstance(payload.get("review_artifacts"), list) else 0 ) connector_run_count = ( len(payload.get("connector_runs", [])) if isinstance(payload.get("connector_runs"), list) else 0 ) tombstone_count = ( len(payload.get("tombstones", [])) if isinstance(payload.get("tombstones"), list) else 0 ) review_required = bool(diff_counts["conflicted"] or review_artifact_count or tombstone_count) changed = any(diff_counts.values()) if review_required: health = "needs_review" elif changed: health = "changed" else: health = "fresh" return { "health": health, "review_required": review_required, "review_artifact_count": review_artifact_count, "connector_run_count": connector_run_count, "tombstone_count": tombstone_count, } def _row_dict(row: sqlite3.Row) -> dict[str, Any]: return {key: row[key] for key in row.keys()} def _resettable_counts(db: sqlite3.Connection) -> dict[str, int]: return { "snapshots": int(db.execute("select count(*) from snapshots").fetchone()[0]), "discovery_snapshots": int(db.execute("select count(*) from discovery_snapshots").fetchone()[0]), "artifacts": int(db.execute("select count(*) from artifacts").fetchone()[0]), "libraries": int(db.execute("select count(*) from libraries").fetchone()[0]), } def _reset_event_dict(row: sqlite3.Row) -> dict[str, Any]: return { "id": row["id"], "created_at": row["created_at"], "reason": row["reason"], "archive_path": row["archive_path"], "archive_sha256": row["archive_sha256"], "dropped_counts": json.loads(row["dropped_counts_json"]), } def _artifact_dict(row: sqlite3.Row) -> dict[str, Any]: return { "id": row["id"], "repo_slug": row["repo_slug"], "target_id": row["target_id"], "target_kind": row["target_kind"], "artifact_type": row["artifact_type"], "name": row["name"], "uri": row["uri"], "media_type": row["media_type"], "digest": row["digest"], "version": row["version"], "metadata": json.loads(row["metadata_json"]), "created_at": row["created_at"], } def _library_dict(row: sqlite3.Row) -> dict[str, Any]: return { "id": row["id"], "repo_slug": row["repo_slug"], "bom_ref": row["bom_ref"], "component_type": row["component_type"], "name": row["name"], "version": row["version"], "purl": row["purl"], "scope": row["scope"], "licenses": json.loads(row["licenses_json"]), "hashes": json.loads(row["hashes_json"]), "metadata": json.loads(row["metadata_json"]), "created_at": row["created_at"], } def _cyclonedx_entries(bom: dict[str, Any]) -> list[dict[str, Any]]: entries: list[dict[str, Any]] = [] for component in bom.get("components", []): if not isinstance(component, dict): continue name = str(component.get("name") or "").strip() if not name: continue entries.append( { "bom_ref": _optional_component_text(component, "bom-ref"), "component_type": str(component.get("type") or "library"), "name": name, "version": _optional_component_text(component, "version"), "purl": _optional_component_text(component, "purl"), "scope": _optional_component_text(component, "scope"), "licenses": _normalize_licenses(component.get("licenses", [])), "hashes": component.get("hashes", []) if isinstance(component.get("hashes", []), list) else [], "metadata": { "group": component.get("group"), "publisher": component.get("publisher"), "description": component.get("description"), "externalReferences": component.get("externalReferences", []), }, } ) for service in bom.get("services", []): if not isinstance(service, dict): continue name = str(service.get("name") or "").strip() if not name: continue entries.append( { "bom_ref": _optional_component_text(service, "bom-ref"), "component_type": "service", "name": name, "version": _optional_component_text(service, "version"), "purl": None, "scope": None, "licenses": [], "hashes": [], "metadata": { "provider": service.get("provider"), "endpoints": service.get("endpoints", []), "externalReferences": service.get("externalReferences", []), }, } ) return entries def _optional_component_text(component: dict[str, Any], key: str) -> str | None: value = component.get(key) if value is None: return None return str(value) def _normalize_licenses(raw: Any) -> list[dict[str, Any]]: if not isinstance(raw, list): return [] normalized = [] for item in raw: if isinstance(item, dict): normalized.append(item) return normalized def _empty_graph() -> dict[str, Any]: return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "FabricGraphExport", "nodes": [], "edges": [], } def _discovery_diff(before: dict[str, Any], after: dict[str, Any]) -> dict[str, Any]: result: dict[str, Any] = {} for collection in ("nodes", "edges", "attributes"): before_items = _discovery_candidates_by_key(before, collection) after_items = _discovery_candidates_by_key(after, collection) added = sorted(set(after_items) - set(before_items)) removed = sorted(set(before_items) - set(after_items)) common = sorted(set(before_items) & set(after_items)) changed = [ key for key in common if _stable_json(_without_review_noise(before_items[key])) != _stable_json(_without_review_noise(after_items[key])) ] confidence_changed = [ { "stable_key": key, "before": before_items[key].get("confidence"), "after": after_items[key].get("confidence"), } for key in common if before_items[key].get("confidence") != after_items[key].get("confidence") ] result[collection] = { "added": [after_items[key] for key in added], "removed": [before_items[key] for key in removed], "changed": [ { "stable_key": key, "before": before_items[key], "after": after_items[key], } for key in changed ], "confidence_changed": confidence_changed, } after_reconciliation = after.get("reconciliation") if isinstance(after.get("reconciliation"), dict) else {} result["review"] = { "retired": after_reconciliation.get("diff", {}).get("retired", []) if isinstance(after_reconciliation.get("diff"), dict) else [], "conflicted": after_reconciliation.get("diff", {}).get("conflicted", []) if isinstance(after_reconciliation.get("diff"), dict) else [], "conflicts": after_reconciliation.get("conflicts", []), } return result def _project_discovery_snapshot( base_graph: dict[str, Any], discovery_snapshot: dict[str, Any], *, accepted_keys: set[str], accept_review_states: set[str], ) -> dict[str, Any]: graph = json.loads(json.dumps(base_graph)) graph.setdefault("apiVersion", "railiance.fabric/v1alpha1") graph.setdefault("kind", "FabricGraphExport") graph.setdefault("nodes", []) graph.setdefault("edges", []) candidates = discovery_snapshot.get("candidates") if isinstance(discovery_snapshot.get("candidates"), dict) else {} candidate_nodes = [ node for node in candidates.get("nodes", []) if isinstance(node, dict) and _candidate_is_accepted(node, accepted_keys, accept_review_states) ] candidate_edges = [ edge for edge in candidates.get("edges", []) if isinstance(edge, dict) and _candidate_is_accepted(edge, accepted_keys, accept_review_states) ] existing_nodes = { str(node.get("id")): node for node in graph.get("nodes", []) if isinstance(node, dict) and node.get("id") } key_to_graph_id = { str(node.get("id")): str(node.get("id")) for node in graph.get("nodes", []) if isinstance(node, dict) and node.get("id") } for candidate in candidate_nodes: graph_id = _candidate_graph_id(candidate) key_to_graph_id[str(candidate.get("stable_key"))] = graph_id if candidate.get("graph_id"): key_to_graph_id[str(candidate.get("graph_id"))] = graph_id if graph_id in existing_nodes: continue projected = _project_candidate_node(candidate, graph_id) existing_nodes[graph_id] = projected graph["nodes"].append(projected) existing_edges = { _edge_key(edge) for edge in graph.get("edges", []) if isinstance(edge, dict) } for candidate in candidate_edges: source = key_to_graph_id.get(str(candidate.get("source_key") or "")) target = key_to_graph_id.get(str(candidate.get("target_key") or "")) if not source or not target: continue edge = _edge_with_canon_metadata( { "from": source, "to": target, "type": str(candidate.get("edge_type") or ""), "canonical_type": candidate.get("canonical_type", ""), "canon_anchor": candidate.get("canon_anchor", ""), "mapping_fit": candidate.get("mapping_fit", ""), "display_only": candidate.get("display_only", False), "evidence_state": candidate.get("evidence_state", ""), } ) edge_key = _edge_key(edge) if edge["type"] and edge_key not in existing_edges: existing_edges.add(edge_key) graph["edges"].append(edge) validate_graph_export(graph) return graph def _project_candidate_node(candidate: dict[str, Any], graph_id: str) -> dict[str, Any]: attributes = candidate.get("attributes") if isinstance(candidate.get("attributes"), dict) else {} canon_mapping = node_canon_mapping(str(candidate.get("kind") or "DiscoveredEntity")) return { "id": graph_id, "kind": str(candidate.get("kind") or "DiscoveredEntity"), "name": str(candidate.get("label") or graph_id), "repo": str(candidate.get("repo") or ""), "domain": str(candidate.get("domain") or ""), "lifecycle": str(candidate.get("lifecycle") or "active"), "canon_category": str(candidate.get("canon_category") or canon_mapping.category), "canon_anchor": str(candidate.get("canon_anchor") or canon_mapping.canon_anchor), "mapping_fit": str(candidate.get("mapping_fit") or canon_mapping.fit), "evidence_state": str(candidate.get("evidence_state") or "declared"), "attributes": { **attributes, "discovery_stable_key": candidate.get("stable_key"), "discovery_origin": candidate.get("origin"), "discovery_review_state": candidate.get("review_state"), "discovery_confidence": candidate.get("confidence"), "discovery_source_anchors": candidate.get("source_anchors", []), "discovery_provenance": candidate.get("provenance", []), }, } def _candidate_graph_id(candidate: dict[str, Any]) -> str: graph_id = str(candidate.get("graph_id") or "").strip() if graph_id: return graph_id repo = _graph_id_part(str(candidate.get("repo") or "repo")) kind = _graph_id_part(str(candidate.get("kind") or "entity")) label = _graph_id_part(str(candidate.get("label") or candidate.get("stable_key") or "candidate")) candidate_id = f"{repo}.{kind}.{label}".strip(".") if len(candidate_id) > 150: digest = hashlib.sha256(str(candidate.get("stable_key") or candidate_id).encode("utf-8")).hexdigest()[:10] candidate_id = f"{candidate_id[:139].rstrip('.')}.{digest}" if len(candidate_id) < 3: candidate_id = f"{candidate_id}.id" return candidate_id def _graph_id_part(value: str) -> str: text = re.sub(r"[^a-z0-9.-]+", "-", value.lower()).strip(".-") text = re.sub(r"-+", "-", text) text = re.sub(r"\.+", ".", text) return text or "unknown" def _candidate_is_accepted( candidate: dict[str, Any], accepted_keys: set[str], accept_review_states: set[str], ) -> bool: stable_key = str(candidate.get("stable_key") or "") if stable_key in accepted_keys: return True return str(candidate.get("review_state") or "") in accept_review_states def _discovery_candidates_by_key(snapshot: dict[str, Any], collection: str) -> dict[str, dict[str, Any]]: candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {} return { str(item["stable_key"]): item for item in candidates.get(collection, []) if isinstance(item, dict) and item.get("stable_key") } def _without_review_noise(candidate: dict[str, Any]) -> dict[str, Any]: return { key: value for key, value in candidate.items() if key not in {"provenance"} } def _graph_diff(before: dict[str, Any], after: dict[str, Any]) -> dict[str, Any]: before_nodes = _nodes_by_id(before) after_nodes = _nodes_by_id(after) before_edges = {_edge_key(edge): edge for edge in before.get("edges", []) if isinstance(edge, dict)} after_edges = {_edge_key(edge): edge for edge in after.get("edges", []) if isinstance(edge, dict)} added_node_ids = sorted(set(after_nodes) - set(before_nodes)) removed_node_ids = sorted(set(before_nodes) - set(after_nodes)) common_node_ids = sorted(set(before_nodes) & set(after_nodes)) changed_node_ids = [ node_id for node_id in common_node_ids if _stable_json(before_nodes[node_id]) != _stable_json(after_nodes[node_id]) ] added_edge_keys = sorted(set(after_edges) - set(before_edges)) removed_edge_keys = sorted(set(before_edges) - set(after_edges)) return { "added_nodes": [after_nodes[node_id] for node_id in added_node_ids], "removed_nodes": [before_nodes[node_id] for node_id in removed_node_ids], "changed_nodes": [ { "id": node_id, "before": before_nodes[node_id], "after": after_nodes[node_id], } for node_id in changed_node_ids ], "added_edges": [after_edges[key] for key in added_edge_keys], "removed_edges": [before_edges[key] for key in removed_edge_keys], } def _edge_key(edge: dict[str, Any]) -> tuple[str, str, str]: return (str(edge.get("from", "")), str(edge.get("to", "")), str(edge.get("type", ""))) def _edge_with_canon_metadata(edge: dict[str, Any]) -> dict[str, Any]: edge_type = str(edge.get("type") or "") canon_mapping = edge_canon_mapping(edge_type) return { "from": str(edge.get("from", "")), "to": str(edge.get("to", "")), "type": edge_type, "canonical_type": str(edge.get("canonical_type") or canon_mapping.canonical_type), "canon_anchor": str(edge.get("canon_anchor") or canon_mapping.canon_anchor), "mapping_fit": str(edge.get("mapping_fit") or canon_mapping.fit), "display_only": bool(edge.get("display_only", canon_mapping.display_only)), "evidence_state": str(edge.get("evidence_state") or "declared"), "attributes": edge.get("attributes", {}) if isinstance(edge.get("attributes"), dict) else {}, } def _stable_json(value: Any) -> str: return json.dumps(value, sort_keys=True, separators=(",", ":")) def _matches(needle: str, value: Any) -> bool: return needle in _stable_json(value).lower() def _required_text(payload: dict[str, Any], key: str, fallback_key: str | None = None) -> str: value = payload.get(key) if value is None and fallback_key is not None: value = payload.get(fallback_key) if not isinstance(value, str) or not value.strip(): raise RegistryError(f"field '{key}' is required") return value.strip() def _optional_text(payload: dict[str, Any], key: str) -> str | None: value = payload.get(key) if value is None: return None if not isinstance(value, str): raise RegistryError(f"field '{key}' must be a string") return value def _utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def _nodes(graph: dict[str, Any]) -> list[dict[str, Any]]: return [node for node in graph.get("nodes", []) if isinstance(node, dict)] def _nodes_by_id(graph: dict[str, Any]) -> dict[str, dict[str, Any]]: return {str(node.get("id", "")): node for node in _nodes(graph)} def _attrs(node: dict[str, Any]) -> dict[str, Any]: attrs = node.get("attributes", {}) return attrs if isinstance(attrs, dict) else {} def _dependency_nodes(graph: dict[str, Any]) -> list[dict[str, Any]]: return [node for node in _nodes(graph) if node.get("kind") == "DependencyDeclaration"] def _dependency_attrs(graph: dict[str, Any], dependency_id: str) -> dict[str, Any]: node = _nodes_by_id(graph).get(dependency_id, {}) return _attrs(node) def _bindings_by_dependency(graph: dict[str, Any]) -> dict[str, list[dict[str, str]]]: result: dict[str, list[dict[str, str]]] = {} for node in _nodes(graph): if node.get("kind") != "BindingAssertion": continue attrs = _attrs(node) dependency_id = str(attrs.get("dependency_id", "")) if not dependency_id: continue result.setdefault(dependency_id, []).append( { "binding_id": str(node.get("id", "")), "provider_capability_id": str(attrs.get("provider_capability_id", "")), "provider_interface_id": str(attrs.get("provider_interface_id", "")), "status": str(attrs.get("status", "")), } ) for bindings in result.values(): bindings.sort(key=lambda item: item["binding_id"]) return result def _consumer_match(attrs: dict[str, Any], dependency_id: str, binding: dict[str, str]) -> dict[str, Any]: return { "consumer_service_id": attrs.get("consumer_service_id", ""), "dependency_id": dependency_id, "required_capability_type": attrs.get("requires_capability_type", ""), "provider_capability_id": binding.get("provider_capability_id", ""), "provider_interface_id": binding.get("provider_interface_id", ""), "status": binding.get("status", ""), } def _backstage_metadata(node: dict[str, Any]) -> dict[str, Any]: node_id = str(node.get("id", "")) metadata = { "name": _backstage_name(node_id), "title": node.get("name", node_id), "annotations": { "railiance.fabric/id": node_id, "railiance.fabric/repo": str(node.get("repo", "")), "railiance.fabric/kind": str(node.get("kind", "")), }, } return metadata def _backstage_owner(node: dict[str, Any]) -> str: repo = str(node.get("repo", "")) domain = str(node.get("domain", "")) return _backstage_name(repo or domain or "unknown") def _backstage_name(value: str) -> str: cleaned = "".join(char.lower() if char.isalnum() else "-" for char in value) cleaned = "-".join(part for part in cleaned.split("-") if part) return cleaned or "unknown" def _backstage_lifecycle(value: str) -> str: if value in {"active", "deprecated"}: return value if value == "retired": return "deprecated" return "experimental" def _backstage_api_type(interface_type: str) -> str: if interface_type == "http-api": return "openapi" if interface_type == "event-stream": return "asyncapi" return "other" def _xregistry_group(singular: str, plural: str) -> dict[str, Any]: return { "singular": singular, "plural": plural, "resources": {}, } def _xregistry_key(value: str) -> str: return value.replace("/", ".").strip(".") or "unknown"