from __future__ import annotations from datetime import datetime, timezone from typing import Any DISPLAY_STATES = ("show", "blur", "hide") LAYER_ORDER = ( "repository", "service", "capability", "interface", "dependency", "binding", "library", ) _KIND_LAYER = { "Repository": "repository", "ServiceDeclaration": "service", "CapabilityDeclaration": "capability", "InterfaceDeclaration": "interface", "DependencyDeclaration": "dependency", "BindingAssertion": "binding", "Library": "library", } _LAYER_COLORS = { "repository": "#475569", "service": "#0f766e", "capability": "#2563eb", "interface": "#7c3aed", "dependency": "#b45309", "binding": "#be123c", "library": "#0891b2", } _EDGE_STRENGTH = { "provides": "strong", "exposes": "strong", "available_via": "medium", "consumes": "medium", "uses_interface": "medium", "declares": "weak", } def fabric_graph_explorer_manifest(base_url: str = "") -> dict[str, Any]: """Return the host manifest for the reusable graph explorer shell.""" return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "GraphExplorerManifest", "id": "railiance-fabric.registry-map", "title": "Railiance Fabric Registry Map", "description": "Manifest for exploring registered Fabric ecosystem entities.", "engine": { "suggested_package": "graph-explorer-engine", "preferred_library": "cytoscape", "display_state_owner": "host-or-engine", }, "endpoints": { "graph": { "method": "GET", "url": f"{base_url}/exports/graph-explorer", }, "manifest": { "method": "GET", "url": f"{base_url}/exports/graph-explorer/manifest", }, }, "identity": { "node_id_field": "id", "stable_key_field": "stableKey", "edge_id_field": "id", "source_field": "source", "target_field": "target", }, "layers": [ { "id": layer, "label": layer.replace("_", " ").title(), "order": index, "color": _LAYER_COLORS[layer], } for index, layer in enumerate(LAYER_ORDER) ], "grouping_fields": ["domain", "repo", "layer", "kind", "lifecycle", "unresolved"], "search_fields": [ "id", "stableKey", "label", "name", "description", "repo", "domain", "kind", "layer", "edgeType", ], "filter": { "actions": list(DISPLAY_STATES), "fields": [ {"id": "kind", "label": "Kind", "type": "string"}, {"id": "layer", "label": "Layer", "type": "string"}, {"id": "repo", "label": "Repo", "type": "string"}, {"id": "domain", "label": "Domain", "type": "string"}, {"id": "lifecycle", "label": "Lifecycle", "type": "string"}, {"id": "reviewState", "label": "Review State", "type": "string"}, {"id": "unresolved", "label": "Unresolved", "type": "boolean"}, {"id": "edgeType", "label": "Edge Type", "type": "string"}, {"id": "strength", "label": "Strength", "type": "string"}, {"id": "sameLayer", "label": "Same Layer", "type": "boolean"}, {"id": "text", "label": "Text", "type": "string"}, ], }, "visual_encodings": { "node_color": "layer", "node_shape": "kind", "node_size": "confidence", "edge_width": "strength", "edge_style": "sameLayer", "edge_opacity": "displayState", }, "detail": { "node_fields": [ "name", "kind", "layer", "repo", "domain", "lifecycle", "unresolved", "description", "sourceReferences", "deepLinks", ], "edge_fields": [ "edgeType", "strength", "source", "target", "sourceLayer", "targetLayer", "sameLayer", "deepLinks", ], }, "modes": [ { "id": "full", "label": "Full", "description": "Show the complete registered ecosystem graph.", }, { "id": "selected-path", "label": "Selected Path", "description": "Show selected nodes with predecessor and successor context.", "requires_selection": True, }, { "id": "neighborhood", "label": "Neighborhood", "description": "Show the selected node neighborhood by configurable depth.", "requires_selection": True, }, { "id": "onboarding-gaps", "label": "Onboarding Gaps", "description": "Highlight registered repos without accepted Fabric graph snapshots.", }, { "id": "unresolved", "label": "Unresolved", "description": "Highlight dependencies that have no accepted provider binding.", }, ], "profile_persistence": "local", "shareable_state": { "url_parameters": True, "profile_id": True, "state_blob": True, }, } def fabric_graph_explorer_payload( graph: dict[str, Any], repositories: list[dict[str, Any]] | None = None, snapshot_repo_slugs: set[str] | None = None, ) -> dict[str, Any]: """Project a Fabric graph export into the shared graph explorer payload.""" source_nodes = [node for node in graph.get("nodes", []) if isinstance(node, dict)] source_edges = [edge for edge in graph.get("edges", []) if isinstance(edge, dict)] repositories = repositories or [] snapshot_repo_slugs = snapshot_repo_slugs or set() layers_by_id = { str(node.get("id", "")): _layer_for_kind(str(node.get("kind", ""))) for node in source_nodes } source_repo_slugs = { str(node.get("repo", "")).strip() for node in source_nodes if str(node.get("repo", "")).strip() } registered_repo_slugs = { str(repo.get("slug", "")).strip() for repo in repositories if str(repo.get("slug", "")).strip() } repo_slugs = set(source_repo_slugs) repo_slugs.update(registered_repo_slugs) repo_slugs.update(snapshot_repo_slugs) repo_slugs.discard("") unresolved = _unresolved_dependency_ids(source_nodes, source_edges) elements: list[dict[str, Any]] = [] repository_index = {str(repo.get("slug", "")): repo for repo in repositories} for slug in sorted(repo_slugs): repo = repository_index.get(slug, {}) has_snapshot = slug in snapshot_repo_slugs or slug in source_repo_slugs is_registered = slug in registered_repo_slugs repo_id = f"repo:{slug}" elements.append( { "data": { "id": repo_id, "stableKey": repo_id, "kind": "Repository", "layer": "repository", "label": str(repo.get("name") or slug), "name": str(repo.get("name") or slug), "description": ( "Registered repository with accepted Fabric graph snapshot." if has_snapshot else "Registered repository without an accepted Fabric graph snapshot." ), "repo": slug, "domain": "railiance", "lifecycle": "active" if has_snapshot else "registered-only", "reviewState": "accepted" if has_snapshot else "candidate", "freshnessState": "current" if has_snapshot else "missing", "unresolved": is_registered and not has_snapshot, "confidence": 1.0 if has_snapshot else 0.3, "visualSize": 56 if has_snapshot else 42, "ownership": "registry", "displayState": "show", "visibilitySource": "default", "visibilityReason": "default", "sourceReferences": [], "deepLinks": _repository_links(repo), }, "classes": "repository accepted" if has_snapshot else "repository candidate unresolved", } ) for node in source_nodes: node_id = str(node.get("id", "")) if not node_id: continue kind = str(node.get("kind", "")) layer = _layer_for_kind(kind) is_unresolved = node_id in unresolved attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {} elements.append( { "data": { "id": node_id, "stableKey": node_id, "kind": kind, "layer": layer, "label": str(node.get("name") or node_id), "name": str(node.get("name") or node_id), "description": _node_description(kind, attributes), "repo": str(node.get("repo", "")), "domain": str(node.get("domain", "")), "lifecycle": str(node.get("lifecycle", "")), "reviewState": "accepted", "freshnessState": "current", "unresolved": is_unresolved, "confidence": 0.45 if is_unresolved else 1.0, "visualSize": 34 if layer == "binding" else 46 if is_unresolved else 50, "ownership": str(attributes.get("owner") or "repo"), "attributes": attributes, "displayState": "show", "visibilitySource": "default", "visibilityReason": "default", "sourceReferences": _source_references(node), "deepLinks": _node_links(node_id, attributes), }, "classes": " ".join( part for part in (layer, kind, "unresolved" if is_unresolved else "accepted") if part ), } ) edge_index = 0 for edge in source_edges: source = str(edge.get("from", "")) target = str(edge.get("to", "")) edge_type = str(edge.get("type", "")) if not source or not target: continue source_layer = layers_by_id.get(source, "unknown") target_layer = layers_by_id.get(target, "unknown") strength = _edge_strength(edge_type) edge_id = f"edge:{edge_index}:{source}:{edge_type}:{target}" edge_index += 1 elements.append( { "data": { "id": edge_id, "stableKey": edge_id, "kind": "edge", "layer": "relationship", "label": edge_type, "source": source, "target": target, "sourceLayer": source_layer, "targetLayer": target_layer, "edgeType": edge_type, "dependencyType": edge_type, "strength": strength, "edgeWidth": _edge_width(strength), "sameLayer": source_layer == target_layer, "reviewState": "accepted", "freshnessState": "current", "displayState": "show", "visibilitySource": "default", "visibilityReason": "default", "deepLinks": {}, }, "classes": " ".join( part for part in ( edge_type.replace(":", "-"), strength, "same-layer" if source_layer == target_layer else "", ) if part ), } ) for slug in sorted(repo_slugs): repo_id = f"repo:{slug}" for node in source_nodes: node_id = str(node.get("id", "")) if str(node.get("repo", "")) != slug or not node_id: continue edge_id = f"edge:{edge_index}:{repo_id}:declares:{node_id}" edge_index += 1 target_layer = layers_by_id.get(node_id, "unknown") elements.append( { "data": { "id": edge_id, "stableKey": edge_id, "kind": "edge", "layer": "relationship", "label": "declares", "source": repo_id, "target": node_id, "sourceLayer": "repository", "targetLayer": target_layer, "edgeType": "declares", "dependencyType": "declares", "strength": "weak", "edgeWidth": 1, "sameLayer": False, "reviewState": "accepted", "freshnessState": "current", "displayState": "show", "visibilitySource": "default", "visibilityReason": "default", "deepLinks": {}, }, "classes": "declares weak", } ) visible_nodes = [element for element in elements if "source" not in element["data"]] visible_edges = [element for element in elements if "source" in element["data"]] return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "GraphExplorerPayload", "manifest_id": "railiance-fabric.registry-map", "generated_at": _utc_now(), "repository": {"slug": "registry", "name": "Railiance Fabric Registry"}, "mode": "full", "profile": None, "metrics": { "node_count": len(visible_nodes), "edge_count": len(visible_edges), "hidden_count": 0, "blurred_count": 0, "registered_repo_count": len(registered_repo_slugs), "repo_node_count": len(repo_slugs), "registered_only_repo_count": sum( 1 for element in visible_nodes if element["data"].get("repo") in registered_repo_slugs and element["data"].get("lifecycle") == "registered-only" ), "unresolved_count": len(unresolved), }, "filter": { "rules": [], "manual_overrides": {}, "orphaned_overrides": [], "precedence": "later rules override earlier rules; manual overrides win last", "connected_edge_behavior": "edges connected to hidden nodes are hidden", }, "elements": elements, "hidden_elements": [], } def _utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def _layer_for_kind(kind: str) -> str: return _KIND_LAYER.get(kind, kind.lower() or "unknown") def _edge_strength(edge_type: str) -> str: if edge_type.startswith("binds:"): status = edge_type.split(":", 1)[1] if status in {"missing", "disputed"}: return "weak" if status in {"accepted", "exact"}: return "strong" return "medium" return _EDGE_STRENGTH.get(edge_type, "medium") def _edge_width(strength: str) -> int: return {"weak": 1, "medium": 3, "strong": 5}.get(strength, 2) def _unresolved_dependency_ids( nodes: list[dict[str, Any]], edges: list[dict[str, Any]], ) -> set[str]: dependency_ids = { str(node.get("id", "")) for node in nodes if node.get("kind") == "DependencyDeclaration" and str(node.get("id", "")) } resolved: set[str] = set() unresolved: set[str] = set() for edge in edges: edge_type = str(edge.get("type", "")) source = str(edge.get("from", "")) if source not in dependency_ids or not edge_type.startswith("binds:"): continue status = edge_type.split(":", 1)[1] if status in {"accepted", "candidate", "exact", "compatible"}: resolved.add(source) if status in {"missing", "disputed"}: unresolved.add(source) return (dependency_ids - resolved) | unresolved def _node_description(kind: str, attributes: object) -> str: if not isinstance(attributes, dict): return "" description = str(attributes.get("description", "")) if description: return description if kind == "CapabilityDeclaration": return str(attributes.get("capability_type", "")) if kind == "InterfaceDeclaration": version = str(attributes.get("version", "")) interface_type = str(attributes.get("interface_type", "")) return " ".join(part for part in (interface_type, version) if part) if kind == "DependencyDeclaration": return str( attributes.get("requires_capability_type") or attributes.get("requires_capability_id") or attributes.get("interface_type") or "" ) if kind == "BindingAssertion": return str(attributes.get("status", "")) return "" def _source_references(node: dict[str, Any]) -> list[dict[str, str]]: attributes = node.get("attributes") references: list[dict[str, str]] = [] if isinstance(attributes, dict): source_path = str(attributes.get("source_path") or "") if source_path: references.append({"label": "Declaration", "path": source_path}) for source in attributes.get("source_links", []): if isinstance(source, dict): references.append({key: str(value) for key, value in source.items()}) return references def _node_links(node_id: str, attributes: dict[str, Any] | None = None) -> dict[str, str]: links = {"registry": f"/graph/nodes/{node_id}"} source_path = str((attributes or {}).get("source_path") or "") if source_path: links["sourceFile"] = source_path return links def _repository_links(repository: dict[str, Any]) -> dict[str, str]: slug = str(repository.get("slug", "")) links = {"registry": f"/repositories/{slug}"} if slug else {} state_hub_repo_id = str(repository.get("state_hub_repo_id") or "") if state_hub_repo_id: links["stateHubRepo"] = f"/repos/by-id/{state_hub_repo_id}" return links