Files
railiance-fabric/railiance_fabric/graph_explorer.py

754 lines
28 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from re import sub
from typing import Any
from urllib.parse import urlparse
DISPLAY_STATES = ("show", "blur", "hide", "highlight", "remove")
LAYER_ORDER = (
"repository",
"server",
"deployment",
"service",
"capability",
"interface",
"dependency",
"binding",
"library",
)
_KIND_LAYER = {
"Repository": "repository",
"Server": "server",
"Deployment": "deployment",
"ServiceDeclaration": "service",
"CapabilityDeclaration": "capability",
"InterfaceDeclaration": "interface",
"DependencyDeclaration": "dependency",
"BindingAssertion": "binding",
"Library": "library",
}
_LAYER_COLORS = {
"repository": "#475569",
"server": "#334155",
"deployment": "#16a34a",
"service": "#0f766e",
"capability": "#2563eb",
"interface": "#7c3aed",
"dependency": "#b45309",
"binding": "#be123c",
"library": "#0891b2",
}
_EDGE_STRENGTH = {
"provides": "strong",
"exposes": "strong",
"available_via": "medium",
"consumes": "medium",
"uses_interface": "medium",
"declares": "weak",
"deployed_as": "medium",
"runs_on": "strong",
"owns_deployment": "weak",
}
def fabric_graph_explorer_manifest(base_url: str = "") -> dict[str, Any]:
"""Return the host manifest for the reusable graph explorer shell."""
return {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "GraphExplorerManifest",
"id": "railiance-fabric.registry-map",
"title": "Railiance Fabric Registry Map",
"description": "Manifest for exploring registered Fabric ecosystem entities.",
"engine": {
"suggested_package": "graph-explorer-engine",
"preferred_library": "cytoscape",
"display_state_owner": "host-or-engine",
},
"endpoints": {
"graph": {
"method": "GET",
"url": f"{base_url}/exports/graph-explorer",
},
"manifest": {
"method": "GET",
"url": f"{base_url}/exports/graph-explorer/manifest",
},
},
"identity": {
"node_id_field": "id",
"stable_key_field": "stableKey",
"edge_id_field": "id",
"source_field": "source",
"target_field": "target",
},
"layers": [
{
"id": layer,
"label": layer.replace("_", " ").title(),
"order": index,
"color": _LAYER_COLORS[layer],
}
for index, layer in enumerate(LAYER_ORDER)
],
"grouping_fields": [
"domain",
"repo",
"layer",
"kind",
"environment",
"serverHost",
"lifecycle",
"unresolved",
],
"search_fields": [
"id",
"stableKey",
"label",
"name",
"description",
"repo",
"domain",
"environment",
"serverHost",
"kind",
"layer",
"edgeType",
],
"filter": {
"actions": list(DISPLAY_STATES),
"fields": [
{"id": "kind", "label": "Kind", "type": "string"},
{"id": "layer", "label": "Node Type", "type": "string"},
{"id": "repo", "label": "Repo", "type": "string"},
{"id": "domain", "label": "Domain", "type": "string"},
{"id": "environment", "label": "Environment", "type": "string"},
{"id": "serverHost", "label": "Server Host", "type": "string"},
{"id": "lifecycle", "label": "Lifecycle", "type": "string"},
{"id": "reviewState", "label": "Review State", "type": "string"},
{"id": "unresolved", "label": "Unresolved", "type": "boolean"},
{"id": "edgeType", "label": "Edge Type", "type": "string"},
{"id": "strength", "label": "Strength", "type": "string"},
{"id": "sameLayer", "label": "Same Layer", "type": "boolean"},
{"id": "text", "label": "Text", "type": "string"},
],
},
"visual_encodings": {
"node_color": "layer",
"node_shape": "kind",
"node_size": "confidence",
"edge_width": "strength",
"edge_style": "sameLayer",
"edge_opacity": "displayState",
},
"detail": {
"node_fields": [
"name",
"kind",
"layer",
"repo",
"domain",
"lifecycle",
"unresolved",
"description",
"sourceReferences",
"deepLinks",
],
"edge_fields": [
"edgeType",
"strength",
"source",
"target",
"sourceLayer",
"targetLayer",
"sameLayer",
"deepLinks",
],
},
"modes": [
{
"id": "full",
"label": "Full",
"description": "Show the complete registered ecosystem graph.",
},
{
"id": "selected-path",
"label": "Selected Path",
"description": "Show selected nodes with predecessor and successor context.",
"requires_selection": True,
},
{
"id": "neighborhood",
"label": "Neighborhood",
"description": "Show the selected node neighborhood by configurable depth.",
"requires_selection": True,
},
{
"id": "onboarding-gaps",
"label": "Onboarding Gaps",
"description": "Highlight registered repos without accepted Fabric graph snapshots.",
},
{
"id": "unresolved",
"label": "Unresolved",
"description": "Highlight dependencies that have no accepted provider binding.",
},
],
"profile_persistence": "local",
"shareable_state": {
"url_parameters": True,
"profile_id": True,
"state_blob": True,
},
}
def fabric_graph_explorer_payload(
graph: dict[str, Any],
repositories: list[dict[str, Any]] | None = None,
snapshot_repo_slugs: set[str] | None = None,
) -> dict[str, Any]:
"""Project a Fabric graph export into the shared graph explorer payload."""
source_nodes = [node for node in graph.get("nodes", []) if isinstance(node, dict)]
source_edges = [edge for edge in graph.get("edges", []) if isinstance(edge, dict)]
repositories = repositories or []
snapshot_repo_slugs = snapshot_repo_slugs or set()
source_repo_slugs = {
str(node.get("repo", "")).strip()
for node in source_nodes
if str(node.get("repo", "")).strip()
}
registered_repo_slugs = {
str(repo.get("slug", "")).strip()
for repo in repositories
if str(repo.get("slug", "")).strip()
}
repo_slugs = set(source_repo_slugs)
repo_slugs.update(registered_repo_slugs)
repo_slugs.update(snapshot_repo_slugs)
repo_slugs.discard("")
unresolved = _unresolved_dependency_ids(source_nodes, source_edges)
elements: list[dict[str, Any]] = []
repository_index = {str(repo.get("slug", "")): repo for repo in repositories}
for slug in sorted(repo_slugs):
repo = repository_index.get(slug, {})
has_snapshot = slug in snapshot_repo_slugs or slug in source_repo_slugs
is_registered = slug in registered_repo_slugs
repo_id = f"repo:{slug}"
elements.append(
{
"data": {
"id": repo_id,
"stableKey": repo_id,
"kind": "Repository",
"layer": "repository",
"label": str(repo.get("name") or slug),
"name": str(repo.get("name") or slug),
"description": (
"Registered repository with accepted Fabric graph snapshot."
if has_snapshot
else "Registered repository without an accepted Fabric graph snapshot."
),
"repo": slug,
"domain": "railiance",
"lifecycle": "active" if has_snapshot else "registered-only",
"reviewState": "accepted" if has_snapshot else "candidate",
"freshnessState": "current" if has_snapshot else "missing",
"unresolved": is_registered and not has_snapshot,
"confidence": 1.0 if has_snapshot else 0.3,
"visualSize": 56 if has_snapshot else 42,
"ownership": "registry",
"displayState": "show",
"visibilitySource": "default",
"visibilityReason": "default",
"sourceReferences": [],
"deepLinks": _repository_links(repo),
},
"classes": "repository accepted" if has_snapshot else "repository candidate unresolved",
}
)
for node in source_nodes:
node_id = str(node.get("id", ""))
if not node_id:
continue
kind = str(node.get("kind", ""))
layer = _layer_for_kind(kind)
is_unresolved = node_id in unresolved
attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {}
elements.append(
{
"data": {
"id": node_id,
"stableKey": node_id,
"kind": kind,
"layer": layer,
"label": str(node.get("name") or node_id),
"name": str(node.get("name") or node_id),
"description": _node_description(kind, attributes),
"repo": str(node.get("repo", "")),
"domain": str(node.get("domain", "")),
"lifecycle": str(node.get("lifecycle", "")),
"reviewState": "accepted",
"freshnessState": "current",
"unresolved": is_unresolved,
"confidence": 0.45 if is_unresolved else 1.0,
"visualSize": 34 if layer == "binding" else 46 if is_unresolved else 50,
"ownership": str(attributes.get("owner") or "repo"),
"attributes": attributes,
"displayState": "show",
"visibilitySource": "default",
"visibilityReason": "default",
"sourceReferences": _source_references(node),
"deepLinks": _node_links(node_id, attributes),
},
"classes": " ".join(
part
for part in (layer, kind, "unresolved" if is_unresolved else "accepted")
if part
),
}
)
node_layers = _node_data_index(elements, "layer")
node_repos = _node_data_index(elements, "repo")
edge_index = _append_infrastructure_elements(
source_nodes,
elements,
node_layers,
node_repos,
repo_slugs,
)
for edge in source_edges:
source = str(edge.get("from", ""))
target = str(edge.get("to", ""))
edge_type = str(edge.get("type", ""))
if not source or not target:
continue
elements.append(_edge_element(edge_index, source, target, edge_type, node_layers, node_repos))
edge_index += 1
for slug in sorted(repo_slugs):
repo_id = f"repo:{slug}"
for node in source_nodes:
node_id = str(node.get("id", ""))
if str(node.get("repo", "")) != slug or not node_id:
continue
elements.append(_edge_element(edge_index, repo_id, node_id, "declares", node_layers, node_repos))
edge_index += 1
visible_nodes = [element for element in elements if "source" not in element["data"]]
visible_edges = [element for element in elements if "source" in element["data"]]
return {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "GraphExplorerPayload",
"manifest_id": "railiance-fabric.registry-map",
"generated_at": _utc_now(),
"repository": {"slug": "registry", "name": "Railiance Fabric Registry"},
"mode": "full",
"profile": None,
"metrics": {
"node_count": len(visible_nodes),
"edge_count": len(visible_edges),
"hidden_count": 0,
"blurred_count": 0,
"registered_repo_count": len(registered_repo_slugs),
"repo_node_count": len(repo_slugs),
"deployment_node_count": sum(1 for element in visible_nodes if element["data"].get("kind") == "Deployment"),
"server_node_count": sum(1 for element in visible_nodes if element["data"].get("kind") == "Server"),
"registered_only_repo_count": sum(
1
for element in visible_nodes
if element["data"].get("repo") in registered_repo_slugs
and element["data"].get("lifecycle") == "registered-only"
),
"unresolved_count": len(unresolved),
},
"filter": {
"rules": [],
"manual_overrides": {},
"orphaned_overrides": [],
"precedence": (
"later rules override earlier rules; remove excludes matching elements "
"from layout and wins over manual overrides; otherwise manual overrides win last"
),
"connected_edge_behavior": (
"edges connected to hidden nodes are hidden; edges connected to removed nodes are removed"
),
},
"elements": elements,
"hidden_elements": [],
}
def _utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _layer_for_kind(kind: str) -> str:
return _KIND_LAYER.get(kind, kind.lower() or "unknown")
def _edge_strength(edge_type: str) -> str:
if edge_type.startswith("binds:"):
status = edge_type.split(":", 1)[1]
if status in {"missing", "disputed"}:
return "weak"
if status in {"accepted", "exact"}:
return "strong"
return "medium"
return _EDGE_STRENGTH.get(edge_type, "medium")
def _edge_width(strength: str) -> int:
return {"weak": 1, "medium": 3, "strong": 5}.get(strength, 2)
def _node_data_index(elements: list[dict[str, Any]], field: str) -> dict[str, str]:
index: dict[str, str] = {}
for element in elements:
data = element.get("data", {})
if isinstance(data, dict) and "source" not in data:
node_id = str(data.get("id") or "")
if node_id:
index[node_id] = str(data.get(field) or "")
return index
def _append_infrastructure_elements(
source_nodes: list[dict[str, Any]],
elements: list[dict[str, Any]],
node_layers: dict[str, str],
node_repos: dict[str, str],
repo_slugs: set[str],
) -> int:
edge_index = 0
endpoints_by_service = _endpoints_by_service(source_nodes)
server_ids_by_host: dict[str, str] = {}
service_nodes = sorted(
(node for node in source_nodes if node.get("kind") == "ServiceDeclaration"),
key=lambda node: str(node.get("id", "")),
)
for service in service_nodes:
service_id = str(service.get("id", ""))
if not service_id:
continue
attributes = service.get("attributes") if isinstance(service.get("attributes"), dict) else {}
environments = _environments(attributes)
repo = str(service.get("repo") or "")
service_name = str(service.get("name") or service_id)
service_endpoints = endpoints_by_service.get(service_id, [])
for environment in environments:
deployment_id = f"deployment:{service_id}:{environment}"
matching_endpoints = [
endpoint
for endpoint in service_endpoints
if _environment_matches(environment, endpoint["environments"])
]
server_hosts = sorted({endpoint["host"] for endpoint in matching_endpoints})
deployment_data = {
"id": deployment_id,
"stableKey": deployment_id,
"kind": "Deployment",
"layer": "deployment",
"label": f"{service_name} [{environment}]",
"name": f"{service_name} deployment ({environment})",
"description": f"{service_name} deployment in {environment}.",
"repo": repo,
"domain": str(service.get("domain") or ""),
"lifecycle": str(service.get("lifecycle") or ""),
"environment": environment,
"serviceId": service_id,
"serverHosts": server_hosts,
"reviewState": "accepted",
"freshnessState": "current",
"unresolved": False,
"confidence": 0.85 if server_hosts else 0.65,
"visualSize": 42,
"ownership": str(attributes.get("owner") or "repo"),
"attributes": {
"service_id": service_id,
"environment": environment,
"server_hosts": server_hosts,
"source_service": service_id,
},
"displayState": "show",
"visibilitySource": "default",
"visibilityReason": "default",
"sourceReferences": _source_references(service),
"deepLinks": {"service": f"/graph/nodes/{service_id}"},
}
elements.append({"data": deployment_data, "classes": "deployment accepted"})
node_layers[deployment_id] = "deployment"
node_repos[deployment_id] = repo
elements.append(_edge_element(edge_index, service_id, deployment_id, "deployed_as", node_layers, node_repos))
edge_index += 1
if repo and repo in repo_slugs:
repo_id = f"repo:{repo}"
elements.append(_edge_element(edge_index, repo_id, deployment_id, "owns_deployment", node_layers, node_repos))
edge_index += 1
for endpoint in matching_endpoints:
server_id = server_ids_by_host.get(endpoint["host"])
if server_id is None:
server_id = _server_id(endpoint["host"])
server_ids_by_host[endpoint["host"]] = server_id
server_data = {
"id": server_id,
"stableKey": server_id,
"kind": "Server",
"layer": "server",
"label": endpoint["host"],
"name": endpoint["host"],
"description": f"Server inferred from endpoint {endpoint['url']}.",
"repo": "",
"domain": str(service.get("domain") or ""),
"lifecycle": "active",
"environment": environment,
"serverHost": endpoint["host"],
"reviewState": "accepted",
"freshnessState": "current",
"unresolved": False,
"confidence": 0.7,
"visualSize": 48,
"ownership": "inferred",
"attributes": {
"host": endpoint["host"],
"source_interface_id": endpoint["interface_id"],
"endpoint_url": endpoint["url"],
},
"displayState": "show",
"visibilitySource": "default",
"visibilityReason": "default",
"sourceReferences": [{"label": "Endpoint interface", "ref": endpoint["interface_id"]}],
"deepLinks": {"interface": f"/graph/nodes/{endpoint['interface_id']}"},
}
elements.append({"data": server_data, "classes": "server accepted inferred"})
node_layers[server_id] = "server"
node_repos[server_id] = ""
elements.append(_edge_element(edge_index, deployment_id, server_id, "runs_on", node_layers, node_repos))
edge_index += 1
return edge_index
def _endpoints_by_service(source_nodes: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
endpoints: dict[str, list[dict[str, Any]]] = {}
for node in source_nodes:
if node.get("kind") != "InterfaceDeclaration":
continue
attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {}
endpoint = attributes.get("endpoint") if isinstance(attributes.get("endpoint"), dict) else {}
url = str(endpoint.get("url") or "").strip()
host = _endpoint_host(url)
service_id = str(attributes.get("service_id") or "")
if not service_id or not host:
continue
endpoints.setdefault(service_id, []).append(
{
"host": host,
"url": url,
"interface_id": str(node.get("id") or ""),
"environments": _environments(attributes),
}
)
return endpoints
def _environments(attributes: dict[str, Any]) -> list[str]:
environments = [
str(environment)
for environment in attributes.get("environments", [])
if str(environment).strip()
]
return environments or ["all"]
def _environment_matches(deployment_environment: str, endpoint_environments: list[str]) -> bool:
return (
deployment_environment == "all"
or "all" in endpoint_environments
or deployment_environment in endpoint_environments
)
def _endpoint_host(url: str) -> str:
if not url:
return ""
parsed = urlparse(url)
host = parsed.netloc or parsed.path.split("/", 1)[0]
return host.strip().lower()
def _server_id(host: str) -> str:
key = sub(r"[^A-Za-z0-9._:+-]+", "-", host.lower()).strip("-")
return f"server:{key or 'unknown'}"
def _edge_element(
edge_index: int,
source: str,
target: str,
edge_type: str,
node_layers: dict[str, str],
node_repos: dict[str, str],
) -> dict[str, Any]:
source_layer = node_layers.get(source, "unknown")
target_layer = node_layers.get(target, "unknown")
source_repo = node_repos.get(source, "")
target_repo = node_repos.get(target, "")
same_repo = bool(source_repo and source_repo == target_repo)
strength = _edge_strength(edge_type)
layout = _layout_hints(edge_type, source_layer, target_layer, same_repo)
edge_id = f"edge:{edge_index}:{source}:{edge_type}:{target}"
return {
"data": {
"id": edge_id,
"stableKey": edge_id,
"kind": "edge",
"layer": "relationship",
"label": edge_type,
"source": source,
"target": target,
"sourceLayer": source_layer,
"targetLayer": target_layer,
"sourceRepo": source_repo,
"targetRepo": target_repo,
"edgeType": edge_type,
"dependencyType": edge_type,
"strength": strength,
"edgeWidth": _edge_width(strength),
"sameLayer": source_layer == target_layer,
"sameRepo": same_repo,
"layoutAffinity": layout["affinity"],
"layoutIdealLength": layout["ideal_length"],
"layoutElasticity": layout["elasticity"],
"reviewState": "accepted",
"freshnessState": "current",
"displayState": "show",
"visibilitySource": "default",
"visibilityReason": "default",
"deepLinks": {},
},
"classes": " ".join(
part
for part in (
edge_type.replace(":", "-"),
strength,
str(layout["affinity"]),
"same-layer" if source_layer == target_layer else "",
"same-repo" if same_repo else "",
)
if part
),
}
def _layout_hints(
edge_type: str,
source_layer: str,
target_layer: str,
same_repo: bool,
) -> dict[str, int | str]:
if edge_type == "runs_on":
return {"affinity": "deployment-server", "ideal_length": 42, "elasticity": 240}
if edge_type == "deployed_as":
return {"affinity": "service-deployment", "ideal_length": 58, "elasticity": 180}
if edge_type == "owns_deployment":
return {"affinity": "repo-deployment", "ideal_length": 150, "elasticity": 28}
if edge_type == "declares":
return {"affinity": "repo-declaration", "ideal_length": 88, "elasticity": 110}
if same_repo:
return {"affinity": "same-repo", "ideal_length": 72, "elasticity": 150}
if "repository" in {source_layer, target_layer}:
return {"affinity": "repo-loose", "ideal_length": 130, "elasticity": 45}
return {"affinity": "cross-repo", "ideal_length": 190, "elasticity": 22}
def _unresolved_dependency_ids(
nodes: list[dict[str, Any]],
edges: list[dict[str, Any]],
) -> set[str]:
dependency_ids = {
str(node.get("id", ""))
for node in nodes
if node.get("kind") == "DependencyDeclaration" and str(node.get("id", ""))
}
resolved: set[str] = set()
unresolved: set[str] = set()
for edge in edges:
edge_type = str(edge.get("type", ""))
source = str(edge.get("from", ""))
if source not in dependency_ids or not edge_type.startswith("binds:"):
continue
status = edge_type.split(":", 1)[1]
if status in {"accepted", "candidate", "exact", "compatible"}:
resolved.add(source)
if status in {"missing", "disputed"}:
unresolved.add(source)
return (dependency_ids - resolved) | unresolved
def _node_description(kind: str, attributes: object) -> str:
if not isinstance(attributes, dict):
return ""
description = str(attributes.get("description", ""))
if description:
return description
if kind == "CapabilityDeclaration":
return str(attributes.get("capability_type", ""))
if kind == "InterfaceDeclaration":
version = str(attributes.get("version", ""))
interface_type = str(attributes.get("interface_type", ""))
return " ".join(part for part in (interface_type, version) if part)
if kind == "DependencyDeclaration":
return str(
attributes.get("requires_capability_type")
or attributes.get("requires_capability_id")
or attributes.get("interface_type")
or ""
)
if kind == "BindingAssertion":
return str(attributes.get("status", ""))
return ""
def _source_references(node: dict[str, Any]) -> list[dict[str, str]]:
attributes = node.get("attributes")
references: list[dict[str, str]] = []
if isinstance(attributes, dict):
source_path = str(attributes.get("source_path") or "")
if source_path:
references.append({"label": "Declaration", "path": source_path})
for source in attributes.get("source_links", []):
if isinstance(source, dict):
references.append({key: str(value) for key, value in source.items()})
return references
def _node_links(node_id: str, attributes: dict[str, Any] | None = None) -> dict[str, str]:
links = {"registry": f"/graph/nodes/{node_id}"}
source_path = str((attributes or {}).get("source_path") or "")
if source_path:
links["sourceFile"] = source_path
return links
def _repository_links(repository: dict[str, Any]) -> dict[str, str]:
slug = str(repository.get("slug", ""))
links = {"registry": f"/repositories/{slug}"} if slug else {}
state_hub_repo_id = str(repository.get("state_hub_repo_id") or "")
if state_hub_repo_id:
links["stateHubRepo"] = f"/repos/by-id/{state_hub_repo_id}"
return links