Files
railiance-fabric/railiance_fabric/graph_explorer.py

1180 lines
46 KiB
Python

from __future__ import annotations
import ipaddress
from datetime import datetime, timezone
from re import sub
from typing import Any
from urllib.parse import urlparse
from .canon import edge_canon_mapping
from .deployment_overlay import graph_explorer_overlay_fields, normalize_deployment_overlay
DISPLAY_STATES = ("show", "blur", "hide", "highlight", "remove")
LAYER_ORDER = (
"repository",
"server",
"runtime_service",
"application",
"network",
"domain",
"deployment",
"service",
"capability",
"interface",
"dependency",
"binding",
"library",
)
_KIND_LAYER = {
"Repository": "repository",
"Server": "server",
"RuntimeService": "runtime_service",
"ApplicationEndpoint": "application",
"NetworkPort": "network",
"DomainName": "domain",
"Deployment": "deployment",
"ServiceDeclaration": "service",
"CapabilityDeclaration": "capability",
"InterfaceDeclaration": "interface",
"DependencyDeclaration": "dependency",
"BindingAssertion": "binding",
"Library": "library",
}
_LAYER_COLORS = {
"repository": "#475569",
"server": "#334155",
"runtime_service": "#0369a1",
"application": "#0f766e",
"network": "#64748b",
"domain": "#9333ea",
"deployment": "#16a34a",
"service": "#0f766e",
"capability": "#2563eb",
"interface": "#7c3aed",
"dependency": "#b45309",
"binding": "#be123c",
"library": "#0891b2",
}
_EDGE_STRENGTH = {
"built_from": "medium",
"depends_on": "medium",
"deploys": "strong",
"evidenced_by": "medium",
"exposes": "strong",
"flows_to": "medium",
"governed_by": "medium",
"implements": "medium",
"observed_by": "medium",
"part_of": "weak",
"reads_or_writes": "medium",
"provides": "strong",
"available_via": "medium",
"consumes": "medium",
"uses_interface": "medium",
"declares": "weak",
"deployed_as": "medium",
"exposes_port": "strong",
"listens_on": "strong",
"names_endpoint": "medium",
"opens_port": "strong",
"routes_to_port": "medium",
"routes_to_service": "strong",
"resolves_to": "medium",
"runs_on": "strong",
"owns_deployment": "weak",
}
def fabric_graph_explorer_manifest(base_url: str = "") -> dict[str, Any]:
"""Return the host manifest for the reusable graph explorer shell."""
return {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "GraphExplorerManifest",
"id": "railiance-fabric.registry-map",
"title": "Railiance Fabric Registry Map",
"description": "Manifest for exploring registered Fabric ecosystem entities.",
"engine": {
"suggested_package": "graph-explorer-engine",
"preferred_library": "cytoscape",
"display_state_owner": "host-or-engine",
},
"endpoints": {
"graph": {
"method": "GET",
"url": f"{base_url}/exports/graph-explorer",
},
"manifest": {
"method": "GET",
"url": f"{base_url}/exports/graph-explorer/manifest",
},
},
"identity": {
"node_id_field": "id",
"stable_key_field": "stableKey",
"edge_id_field": "id",
"source_field": "source",
"target_field": "target",
},
"layers": [
{
"id": layer,
"label": layer.replace("_", " ").title(),
"order": index,
"color": _LAYER_COLORS[layer],
}
for index, layer in enumerate(LAYER_ORDER)
],
"grouping_fields": [
"domain",
"repo",
"layer",
"kind",
"environment",
"deploymentEnvironment",
"deploymentScenario",
"routingAuthority",
"accessZone",
"policyAuthority",
"exposureClass",
"serverHost",
"lifecycle",
"unresolved",
],
"search_fields": [
"id",
"stableKey",
"label",
"name",
"description",
"repo",
"domain",
"environment",
"deploymentEnvironment",
"deploymentScenario",
"routingAuthority",
"accessZone",
"policyAuthority",
"exposureClass",
"serverHost",
"routeHost",
"routeHostname",
"routePort",
"routeProtocol",
"kind",
"layer",
"edgeType",
"canonicalType",
"canonCategory",
"evidenceState",
],
"filter": {
"actions": list(DISPLAY_STATES),
"fields": [
{"id": "kind", "label": "Kind", "type": "string"},
{"id": "layer", "label": "Node Type", "type": "string"},
{"id": "repo", "label": "Repo", "type": "string"},
{"id": "domain", "label": "Domain", "type": "string"},
{"id": "environment", "label": "Environment", "type": "string"},
{"id": "deploymentEnvironment", "label": "Deployment Environment", "type": "string"},
{"id": "deploymentScenario", "label": "Deployment Scenario", "type": "string"},
{"id": "routingAuthority", "label": "Routing Authority", "type": "string"},
{"id": "accessZone", "label": "Access Zone", "type": "string"},
{"id": "policyAuthority", "label": "Policy Authority", "type": "string"},
{"id": "exposureClass", "label": "Exposure Class", "type": "string"},
{"id": "serverHost", "label": "Server Host", "type": "string"},
{"id": "routeHost", "label": "Route Host", "type": "string"},
{"id": "routePort", "label": "Route Port", "type": "number"},
{"id": "lifecycle", "label": "Lifecycle", "type": "string"},
{"id": "reviewState", "label": "Review State", "type": "string"},
{"id": "unresolved", "label": "Unresolved", "type": "boolean"},
{"id": "edgeType", "label": "Edge Type", "type": "string"},
{"id": "canonicalType", "label": "Canonical Edge", "type": "string"},
{"id": "canonCategory", "label": "Canon Category", "type": "string"},
{"id": "evidenceState", "label": "Evidence State", "type": "string"},
{"id": "displayOnly", "label": "Display Only", "type": "boolean"},
{"id": "strength", "label": "Strength", "type": "string"},
{"id": "sameLayer", "label": "Same Layer", "type": "boolean"},
{"id": "text", "label": "Text", "type": "string"},
],
},
"visual_encodings": {
"node_color": "layer",
"node_shape": "kind",
"node_size": "confidence",
"edge_width": "strength",
"edge_style": "sameLayer",
"edge_opacity": "displayState",
},
"detail": {
"node_fields": [
"name",
"kind",
"layer",
"repo",
"domain",
"lifecycle",
"deploymentEnvironment",
"deploymentScenario",
"routingAuthority",
"accessZone",
"policyAuthority",
"exposureClass",
"routeHost",
"routePort",
"canonCategory",
"evidenceState",
"unresolved",
"description",
"sourceReferences",
"deepLinks",
],
"edge_fields": [
"edgeType",
"canonicalType",
"displayOnly",
"evidenceState",
"strength",
"source",
"target",
"sourceLayer",
"targetLayer",
"sameLayer",
"deepLinks",
],
},
"modes": [
{
"id": "by-fabric",
"label": "Fabric",
"description": "Group and filter by fabric/accountability fields.",
},
{
"id": "by-deployment-environment",
"label": "Environment",
"description": "Group and filter by deployment environment.",
},
{
"id": "by-deployment-scenario",
"label": "Scenario",
"description": "Group and filter by deployment scenario.",
},
{
"id": "by-routing-authority",
"label": "Routing",
"description": "Group and filter by routing authority.",
},
{
"id": "by-access-zone",
"label": "Access Zone",
"description": "Group and filter by intended access zone.",
},
{
"id": "full",
"label": "Full",
"description": "Show the complete registered ecosystem graph.",
},
{
"id": "selected-path",
"label": "Selected Path",
"description": "Show selected nodes with predecessor and successor context.",
"requires_selection": True,
},
{
"id": "neighborhood",
"label": "Neighborhood",
"description": "Show the selected node neighborhood by configurable depth.",
"requires_selection": True,
},
{
"id": "onboarding-gaps",
"label": "Onboarding Gaps",
"description": "Highlight registered repos without accepted Fabric graph snapshots.",
},
{
"id": "unresolved",
"label": "Unresolved",
"description": "Highlight dependencies that have no accepted provider binding.",
},
],
"profile_persistence": "local",
"shareable_state": {
"url_parameters": True,
"profile_id": True,
"state_blob": True,
},
}
def fabric_graph_explorer_payload(
graph: dict[str, Any],
repositories: list[dict[str, Any]] | None = None,
snapshot_repo_slugs: set[str] | None = None,
) -> dict[str, Any]:
"""Project a Fabric graph export into the shared graph explorer payload."""
source_nodes = [node for node in graph.get("nodes", []) if isinstance(node, dict)]
source_edges = [edge for edge in graph.get("edges", []) if isinstance(edge, dict)]
repositories = repositories or []
snapshot_repo_slugs = snapshot_repo_slugs or set()
source_repo_slugs = {
str(node.get("repo", "")).strip()
for node in source_nodes
if str(node.get("repo", "")).strip()
}
registered_repo_slugs = {
str(repo.get("slug", "")).strip()
for repo in repositories
if str(repo.get("slug", "")).strip()
}
repo_slugs = set(source_repo_slugs)
repo_slugs.update(registered_repo_slugs)
repo_slugs.update(snapshot_repo_slugs)
repo_slugs.discard("")
unresolved = _unresolved_dependency_ids(source_nodes, source_edges)
elements: list[dict[str, Any]] = []
repository_index = {str(repo.get("slug", "")): repo for repo in repositories}
source_repository_node_ids = {
str(node.get("id", "")): f"repo:{str(node.get('repo', '')).strip()}"
for node in source_nodes
if str(node.get("kind", "")) == "Repository"
and str(node.get("id", ""))
and str(node.get("repo", "")).strip()
}
for slug in sorted(repo_slugs):
repo = repository_index.get(slug, {})
has_snapshot = slug in snapshot_repo_slugs or slug in source_repo_slugs
is_registered = slug in registered_repo_slugs
repo_id = f"repo:{slug}"
elements.append(
{
"data": {
"id": repo_id,
"stableKey": repo_id,
"kind": "Repository",
"layer": "repository",
"label": str(repo.get("name") or slug),
"name": str(repo.get("name") or slug),
"description": (
"Registered repository with accepted Fabric graph snapshot."
if has_snapshot
else "Registered repository without an accepted Fabric graph snapshot."
),
"repo": slug,
"domain": "railiance",
"lifecycle": "active" if has_snapshot else "registered-only",
"reviewState": "accepted" if has_snapshot else "candidate",
"freshnessState": "current" if has_snapshot else "missing",
"unresolved": is_registered and not has_snapshot,
"confidence": 1.0 if has_snapshot else 0.3,
"visualSize": 56 if has_snapshot else 42,
"ownership": "registry",
"displayState": "show",
"visibilitySource": "default",
"visibilityReason": "default",
"sourceReferences": [],
"deepLinks": _repository_links(repo),
},
"classes": "repository accepted" if has_snapshot else "repository candidate unresolved",
}
)
for node in source_nodes:
node_id = str(node.get("id", ""))
if not node_id:
continue
source_kind = str(node.get("kind", ""))
if source_kind == "Repository":
continue
attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {}
overlay_data = _overlay_data(
node.get("deployment_overlay") if isinstance(node.get("deployment_overlay"), dict) else {},
attributes,
)
kind = _presentation_kind(source_kind, attributes)
layer = _layer_for_kind(kind)
is_unresolved = node_id in unresolved
review_state = str(attributes.get("discovery_review_state") or "accepted")
confidence = attributes.get("discovery_confidence")
elements.append(
{
"data": {
"id": node_id,
"stableKey": node_id,
"kind": kind,
"layer": layer,
"label": str(node.get("name") or node_id),
"name": str(node.get("name") or node_id),
"description": _node_description(kind, attributes),
"repo": str(node.get("repo", "")),
"domain": str(node.get("domain", "")),
"lifecycle": str(node.get("lifecycle", "")),
"environment": overlay_data["deploymentEnvironment"],
**overlay_data,
"canonCategory": str(
node.get("canon_category") or attributes.get("canon_category") or ""
),
"canonAnchor": str(node.get("canon_anchor") or attributes.get("canon_anchor") or ""),
"mappingFit": str(node.get("mapping_fit") or attributes.get("mapping_fit") or ""),
"evidenceState": str(
node.get("evidence_state") or attributes.get("evidence_state") or ""
),
"reviewState": review_state,
"freshnessState": "current",
"unresolved": is_unresolved,
"confidence": (
float(confidence)
if isinstance(confidence, (int, float))
else 0.45 if is_unresolved else 1.0
),
"visualSize": 34 if layer == "binding" else 46 if is_unresolved else 50,
"ownership": str(attributes.get("owner") or attributes.get("discovery_origin") or "repo"),
"attributes": {**attributes, "source_kind": source_kind} if source_kind != kind else attributes,
"discovery": {
"stableKey": attributes.get("discovery_stable_key", ""),
"origin": attributes.get("discovery_origin", ""),
"reviewState": attributes.get("discovery_review_state", ""),
"confidence": attributes.get("discovery_confidence", ""),
"provenance": attributes.get("discovery_provenance", []),
},
"displayState": "show",
"visibilitySource": "default",
"visibilityReason": "default",
"sourceReferences": _source_references(node),
"deepLinks": _node_links(node_id, attributes),
},
"classes": " ".join(
part
for part in (layer, kind, "unresolved" if is_unresolved else review_state)
if part
),
}
)
node_layers = _node_data_index(elements, "layer")
node_repos = _node_data_index(elements, "repo")
node_kinds = _node_data_index(elements, "kind")
edge_index = _append_infrastructure_elements(
source_nodes,
elements,
node_layers,
node_repos,
repo_slugs,
)
for edge in source_edges:
source = source_repository_node_ids.get(str(edge.get("from", "")), str(edge.get("from", "")))
target = source_repository_node_ids.get(str(edge.get("to", "")), str(edge.get("to", "")))
edge_type = _presentation_edge_type(str(edge.get("type", "")), source, target, node_kinds)
if not source or not target:
continue
elements.append(
_edge_element(
edge_index,
source,
target,
edge_type,
node_layers,
node_repos,
**_edge_metadata(edge, edge_type),
)
)
edge_index += 1
for slug in sorted(repo_slugs):
repo_id = f"repo:{slug}"
for node in source_nodes:
node_id = str(node.get("id", ""))
if str(node.get("kind", "")) == "Repository":
continue
if str(node.get("repo", "")) != slug or not node_id:
continue
elements.append(
_edge_element(
edge_index,
repo_id,
node_id,
"declares",
node_layers,
node_repos,
display_only=True,
)
)
edge_index += 1
visible_nodes = [element for element in elements if "source" not in element["data"]]
visible_edges = [element for element in elements if "source" in element["data"]]
return {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "GraphExplorerPayload",
"manifest_id": "railiance-fabric.registry-map",
"generated_at": _utc_now(),
"repository": {"slug": "registry", "name": "Railiance Fabric Registry"},
"mode": "full",
"profile": None,
"metrics": {
"node_count": len(visible_nodes),
"edge_count": len(visible_edges),
"hidden_count": 0,
"blurred_count": 0,
"registered_repo_count": len(registered_repo_slugs),
"repo_node_count": len(repo_slugs),
"deployment_node_count": sum(1 for element in visible_nodes if element["data"].get("kind") == "Deployment"),
"server_node_count": sum(1 for element in visible_nodes if element["data"].get("kind") == "Server"),
"registered_only_repo_count": sum(
1
for element in visible_nodes
if element["data"].get("repo") in registered_repo_slugs
and element["data"].get("lifecycle") == "registered-only"
),
"unresolved_count": len(unresolved),
},
"filter": {
"rules": [],
"manual_overrides": {},
"orphaned_overrides": [],
"precedence": (
"later rules override earlier rules; remove excludes matching elements "
"from layout and wins over manual overrides; otherwise manual overrides win last"
),
"connected_edge_behavior": (
"edges connected to hidden nodes are hidden; edges connected to removed nodes are removed"
),
},
"elements": elements,
"hidden_elements": [],
}
def _utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _layer_for_kind(kind: str) -> str:
return _KIND_LAYER.get(kind, kind.lower() or "unknown")
def _presentation_kind(kind: str, attributes: dict[str, Any]) -> str:
if kind != "Server":
return kind
host = str(attributes.get("host") or "").strip().lower()
runtime_type = str(attributes.get("runtime_target_type") or attributes.get("server_type") or "")
if runtime_type == "kubernetes-service-dns" or host.endswith(".svc.cluster.local"):
return "RuntimeService"
if runtime_type in {"declared-endpoint", "ingress-host"} and _looks_like_domain(host):
return "ApplicationEndpoint"
return kind
def _presentation_edge_type(edge_type: str, source: str, target: str, node_kinds: dict[str, str]) -> str:
if edge_type == "resolves_to":
target_kind = node_kinds.get(target, "")
if target_kind == "ApplicationEndpoint":
return "names_endpoint"
if target_kind == "RuntimeService":
return "routes_to_service"
if edge_type == "opens_port" and node_kinds.get(source, "") in {"ApplicationEndpoint", "RuntimeService"}:
return "listens_on"
return edge_type
def _edge_metadata(edge: dict[str, Any], edge_type: str) -> dict[str, Any]:
canon_mapping = edge_canon_mapping(edge_type)
attributes = edge.get("attributes") if isinstance(edge.get("attributes"), dict) else {}
return {
"canonical_type": str(edge.get("canonical_type") or canon_mapping.canonical_type),
"canon_anchor": str(edge.get("canon_anchor") or canon_mapping.canon_anchor),
"mapping_fit": str(edge.get("mapping_fit") or canon_mapping.fit),
"display_only": bool(edge.get("display_only", canon_mapping.display_only)),
"evidence_state": str(edge.get("evidence_state") or "declared"),
"deployment_overlay": normalize_deployment_overlay(
edge.get("deployment_overlay") if isinstance(edge.get("deployment_overlay"), dict) else {},
attributes,
),
}
def _edge_strength(edge_type: str) -> str:
if edge_type.startswith("binds:"):
status = edge_type.split(":", 1)[1]
if status in {"missing", "disputed"}:
return "weak"
if status in {"accepted", "exact"}:
return "strong"
return "medium"
return _EDGE_STRENGTH.get(edge_type, "medium")
def _edge_width(strength: str) -> int:
return {"weak": 1, "medium": 3, "strong": 5}.get(strength, 2)
def _node_data_index(elements: list[dict[str, Any]], field: str) -> dict[str, str]:
index: dict[str, str] = {}
for element in elements:
data = element.get("data", {})
if isinstance(data, dict) and "source" not in data:
node_id = str(data.get("id") or "")
if node_id:
index[node_id] = str(data.get(field) or "")
return index
def _runtime_node_indexes(source_nodes: list[dict[str, Any]]) -> tuple[dict[str, str], dict[str, str]]:
servers_by_host: dict[str, str] = {}
ports_by_endpoint: dict[str, str] = {}
for node in source_nodes:
node_id = str(node.get("id") or "")
if not node_id:
continue
attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {}
kind = _presentation_kind(str(node.get("kind") or ""), attributes)
host = _normalize_endpoint_host(str(attributes.get("host") or ""))
if kind == "Server" and host:
servers_by_host.setdefault(host, node_id)
if kind == "NetworkPort" and host:
port = _int_value(attributes.get("port"))
protocol = _normalize_protocol(str(attributes.get("protocol") or "tcp"))
if port is not None:
ports_by_endpoint.setdefault(_endpoint_key(host, port, protocol), node_id)
return servers_by_host, ports_by_endpoint
def _append_infrastructure_elements(
source_nodes: list[dict[str, Any]],
elements: list[dict[str, Any]],
node_layers: dict[str, str],
node_repos: dict[str, str],
repo_slugs: set[str],
) -> int:
edge_index = 0
endpoints_by_service = _endpoints_by_service(source_nodes)
server_ids_by_host, port_ids_by_endpoint = _runtime_node_indexes(source_nodes)
generated_edge_keys: set[tuple[str, str, str]] = set()
def append_edge(source: str, target: str, edge_type: str, *, display_only: bool = True) -> None:
nonlocal edge_index
if not source or not target:
return
key = (source, edge_type, target)
if key in generated_edge_keys:
return
generated_edge_keys.add(key)
elements.append(
_edge_element(
edge_index,
source,
target,
edge_type,
node_layers,
node_repos,
display_only=display_only,
)
)
edge_index += 1
service_nodes = sorted(
(node for node in source_nodes if node.get("kind") == "ServiceDeclaration"),
key=lambda node: str(node.get("id", "")),
)
for service in service_nodes:
service_id = str(service.get("id", ""))
if not service_id:
continue
attributes = service.get("attributes") if isinstance(service.get("attributes"), dict) else {}
environments = _environments(attributes)
repo = str(service.get("repo") or "")
service_name = str(service.get("name") or service_id)
service_endpoints = endpoints_by_service.get(service_id, [])
for environment in environments:
deployment_id = f"deployment:{service_id}:{environment}"
matching_endpoints = [
endpoint
for endpoint in service_endpoints
if _environment_matches(environment, endpoint["environments"])
]
deployment_overlay = normalize_deployment_overlay(
attributes,
{"deployment_environment": environment},
*[endpoint["deployment_overlay"] for endpoint in matching_endpoints],
)
overlay_data = _overlay_data(deployment_overlay)
server_hosts = sorted({endpoint["host"] for endpoint in matching_endpoints})
deployment_data = {
"id": deployment_id,
"stableKey": deployment_id,
"kind": "Deployment",
"layer": "deployment",
"label": f"{service_name} [{environment}]",
"name": f"{service_name} deployment ({environment})",
"description": f"{service_name} deployment in {environment}.",
"repo": repo,
"domain": str(service.get("domain") or ""),
"lifecycle": str(service.get("lifecycle") or ""),
"environment": environment,
**overlay_data,
"serviceId": service_id,
"serverHosts": server_hosts,
"reviewState": "accepted",
"freshnessState": "current",
"unresolved": False,
"confidence": 0.85 if server_hosts else 0.65,
"visualSize": 42,
"ownership": str(attributes.get("owner") or "repo"),
"attributes": {
"service_id": service_id,
"environment": environment,
"server_hosts": server_hosts,
"source_service": service_id,
},
"displayState": "show",
"visibilitySource": "default",
"visibilityReason": "default",
"sourceReferences": _source_references(service),
"deepLinks": {"service": f"/graph/nodes/{service_id}"},
}
elements.append({"data": deployment_data, "classes": "deployment accepted"})
node_layers[deployment_id] = "deployment"
node_repos[deployment_id] = repo
append_edge(service_id, deployment_id, "deployed_as")
if repo and repo in repo_slugs:
repo_id = f"repo:{repo}"
append_edge(repo_id, deployment_id, "owns_deployment")
for endpoint in matching_endpoints:
host = endpoint["host"]
port = endpoint["port"]
protocol = endpoint["protocol"]
endpoint_overlay_data = _overlay_data(
endpoint["deployment_overlay"],
{"deployment_environment": environment},
)
server_id = server_ids_by_host.get(host)
endpoint_key = _endpoint_key(host, port, protocol)
port_id = port_ids_by_endpoint.get(endpoint_key)
port_was_generated = port_id is None
if server_id is None and _looks_like_machine_address(host):
server_id = _server_id(host)
server_ids_by_host[host] = server_id
server_data = {
"id": server_id,
"stableKey": server_id,
"kind": "Server",
"layer": "server",
"label": host,
"name": host,
"description": f"Server inferred from endpoint {endpoint['url']}.",
"repo": "",
"domain": str(service.get("domain") or ""),
"lifecycle": "active",
"environment": environment,
**endpoint_overlay_data,
"serverHost": host,
"reviewState": "accepted",
"freshnessState": "current",
"unresolved": False,
"confidence": 0.7,
"visualSize": 48,
"ownership": "inferred",
"attributes": {
"host": host,
"source_interface_id": endpoint["interface_id"],
"endpoint_url": endpoint["url"],
"deployment_overlay": endpoint_overlay_data["deploymentOverlay"],
},
"displayState": "show",
"visibilitySource": "default",
"visibilityReason": "default",
"sourceReferences": [{"label": "Endpoint interface", "ref": endpoint["interface_id"]}],
"deepLinks": {"interface": f"/graph/nodes/{endpoint['interface_id']}"},
}
elements.append({"data": server_data, "classes": "server accepted inferred"})
node_layers[server_id] = "server"
node_repos[server_id] = ""
if port_id is None:
port_id = _port_id(host, port, protocol)
port_ids_by_endpoint[endpoint_key] = port_id
port_data = {
"id": port_id,
"stableKey": port_id,
"kind": "NetworkPort",
"layer": "network",
"label": f"{host}:{port}/{protocol}",
"name": f"{host}:{port}/{protocol}",
"description": f"Port inferred from endpoint {endpoint['url']}.",
"repo": "",
"domain": str(service.get("domain") or ""),
"lifecycle": "active",
"environment": environment,
**endpoint_overlay_data,
"serverHost": host,
"reviewState": "accepted",
"freshnessState": "current",
"unresolved": False,
"confidence": 0.7,
"visualSize": 42,
"ownership": "inferred",
"attributes": {
"host": host,
"port": port,
"protocol": protocol,
"source_interface_id": endpoint["interface_id"],
"endpoint_url": endpoint["url"],
"deployment_overlay": endpoint_overlay_data["deploymentOverlay"],
},
"displayState": "show",
"visibilitySource": "default",
"visibilityReason": "default",
"sourceReferences": [{"label": "Endpoint interface", "ref": endpoint["interface_id"]}],
"deepLinks": {"interface": f"/graph/nodes/{endpoint['interface_id']}"},
}
elements.append({"data": port_data, "classes": "network accepted inferred"})
node_layers[port_id] = "network"
node_repos[port_id] = ""
if server_id:
append_edge(deployment_id, server_id, "runs_on")
if port_was_generated:
append_edge(server_id, port_id, "opens_port")
append_edge(deployment_id, port_id, "exposes_port")
return edge_index
def _endpoints_by_service(source_nodes: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
endpoints: dict[str, list[dict[str, Any]]] = {}
for node in source_nodes:
if node.get("kind") != "InterfaceDeclaration":
continue
attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {}
endpoint = attributes.get("endpoint") if isinstance(attributes.get("endpoint"), dict) else {}
url = str(endpoint.get("url") or "").strip()
parsed = _parse_endpoint_url(url)
service_id = str(attributes.get("service_id") or "")
if not service_id or not parsed:
continue
host, port, protocol = parsed
endpoints.setdefault(service_id, []).append(
{
"host": host,
"port": port,
"protocol": protocol,
"url": url,
"interface_id": str(node.get("id") or ""),
"environments": _environments(attributes),
"deployment_overlay": normalize_deployment_overlay(
attributes,
endpoint,
{
"host": host,
"port": port,
"protocol": protocol,
"route": url,
},
),
}
)
return endpoints
def _environments(attributes: dict[str, Any]) -> list[str]:
environments = [
str(environment)
for environment in attributes.get("environments", [])
if str(environment).strip()
]
return environments or ["all"]
def _environment_matches(deployment_environment: str, endpoint_environments: list[str]) -> bool:
return (
deployment_environment == "all"
or "all" in endpoint_environments
or deployment_environment in endpoint_environments
)
def _endpoint_host(url: str) -> str:
parsed = _parse_endpoint_url(url)
return parsed[0] if parsed else ""
def _parse_endpoint_url(url: str) -> tuple[str, int, str] | None:
if not url:
return None
parsed = urlparse(url)
host = _normalize_endpoint_host(parsed.hostname or parsed.netloc or parsed.path.split("/", 1)[0])
try:
port = parsed.port
except ValueError:
port = None
scheme = str(parsed.scheme or "").lower()
port = port or {"http": 80, "https": 443, "postgres": 5432}.get(scheme)
if not host or port is None:
return None
return host, port, "tcp"
def _normalize_endpoint_host(host: str) -> str:
value = str(host or "").strip().lower().strip("[]")
if value in {"0.0.0.0", "::"}:
return "localhost"
return value
def _endpoint_key(host: str, port: int, protocol: str) -> str:
return f"{_normalize_endpoint_host(host)}:{port}/{_normalize_protocol(protocol)}"
def _normalize_protocol(protocol: str) -> str:
return str(protocol or "tcp").strip().lower() or "tcp"
def _int_value(value: object) -> int | None:
if isinstance(value, bool):
return None
if isinstance(value, int):
return value
text = str(value or "").strip()
if text.isdecimal():
return int(text)
return None
def _looks_like_domain(host: str) -> bool:
value = _normalize_endpoint_host(host)
if not value or value == "localhost":
return False
if all(part.isdecimal() for part in value.split(".") if part):
return False
return "." in value
def _looks_like_machine_address(host: str) -> bool:
value = _normalize_endpoint_host(host)
if value in {"localhost", "127.0.0.1"}:
return True
try:
ipaddress.ip_address(value)
except ValueError:
return False
return True
def _server_id(host: str) -> str:
key = sub(r"[^A-Za-z0-9._:+-]+", "-", host.lower()).strip("-")
return f"server:{key or 'unknown'}"
def _port_id(host: str, port: int, protocol: str) -> str:
key = sub(r"[^A-Za-z0-9._:+-]+", "-", _endpoint_key(host, port, protocol)).strip("-")
return f"port:{key or 'unknown'}"
def _edge_element(
edge_index: int,
source: str,
target: str,
edge_type: str,
node_layers: dict[str, str],
node_repos: dict[str, str],
*,
canonical_type: str = "",
canon_anchor: str = "",
mapping_fit: str = "",
display_only: bool = False,
evidence_state: str = "",
deployment_overlay: dict[str, Any] | None = None,
) -> dict[str, Any]:
source_layer = node_layers.get(source, "unknown")
target_layer = node_layers.get(target, "unknown")
source_repo = node_repos.get(source, "")
target_repo = node_repos.get(target, "")
same_repo = bool(source_repo and source_repo == target_repo)
canon_mapping = edge_canon_mapping(edge_type)
canonical_type = canonical_type or canon_mapping.canonical_type
canon_anchor = canon_anchor or canon_mapping.canon_anchor
mapping_fit = mapping_fit or canon_mapping.fit
display_only = display_only or canon_mapping.display_only
evidence_state = evidence_state or "declared"
strength = _edge_strength(edge_type)
layout = _layout_hints(edge_type, source_layer, target_layer, same_repo)
edge_id = f"edge:{edge_index}:{source}:{edge_type}:{target}"
overlay_data = _overlay_data(deployment_overlay or {})
return {
"data": {
"id": edge_id,
"stableKey": edge_id,
"kind": "edge",
"layer": "relationship",
"label": edge_type,
"source": source,
"target": target,
"sourceLayer": source_layer,
"targetLayer": target_layer,
"sourceRepo": source_repo,
"targetRepo": target_repo,
"edgeType": edge_type,
"dependencyType": edge_type,
"canonicalType": canonical_type,
"canonAnchor": canon_anchor,
"mappingFit": mapping_fit,
"displayOnly": display_only,
"evidenceState": evidence_state,
**overlay_data,
"strength": strength,
"edgeWidth": _edge_width(strength),
"sameLayer": source_layer == target_layer,
"sameRepo": same_repo,
"layoutAffinity": layout["affinity"],
"layoutIdealLength": layout["ideal_length"],
"layoutElasticity": layout["elasticity"],
"reviewState": "accepted",
"freshnessState": "current",
"displayState": "show",
"visibilitySource": "default",
"visibilityReason": "default",
"deepLinks": {},
},
"classes": " ".join(
part
for part in (
edge_type.replace(":", "-"),
strength,
str(layout["affinity"]),
"display-only" if display_only else "canonical",
"same-layer" if source_layer == target_layer else "",
"same-repo" if same_repo else "",
)
if part
),
}
def _overlay_data(*sources: object) -> dict[str, Any]:
return graph_explorer_overlay_fields(normalize_deployment_overlay(*sources))
def _layout_hints(
edge_type: str,
source_layer: str,
target_layer: str,
same_repo: bool,
) -> dict[str, int | str]:
if edge_type == "runs_on":
return {"affinity": "deployment-server", "ideal_length": 42, "elasticity": 240}
if edge_type == "deployed_as":
return {"affinity": "service-deployment", "ideal_length": 58, "elasticity": 180}
if edge_type == "owns_deployment":
return {"affinity": "repo-deployment", "ideal_length": 150, "elasticity": 28}
if edge_type == "declares":
return {"affinity": "repo-declaration", "ideal_length": 88, "elasticity": 110}
if same_repo:
return {"affinity": "same-repo", "ideal_length": 72, "elasticity": 150}
if "repository" in {source_layer, target_layer}:
return {"affinity": "repo-loose", "ideal_length": 130, "elasticity": 45}
return {"affinity": "cross-repo", "ideal_length": 190, "elasticity": 22}
def _unresolved_dependency_ids(
nodes: list[dict[str, Any]],
edges: list[dict[str, Any]],
) -> set[str]:
dependency_ids = {
str(node.get("id", ""))
for node in nodes
if node.get("kind") == "DependencyDeclaration" and str(node.get("id", ""))
}
resolved: set[str] = set()
unresolved: set[str] = set()
for edge in edges:
edge_type = str(edge.get("type", ""))
source = str(edge.get("from", ""))
if source not in dependency_ids or not edge_type.startswith("binds:"):
continue
status = edge_type.split(":", 1)[1]
if status in {"accepted", "candidate", "exact", "compatible"}:
resolved.add(source)
if status in {"missing", "disputed"}:
unresolved.add(source)
return (dependency_ids - resolved) | unresolved
def _node_description(kind: str, attributes: object) -> str:
if not isinstance(attributes, dict):
return ""
description = str(attributes.get("description", ""))
if description:
return description
if kind == "CapabilityDeclaration":
return str(attributes.get("capability_type", ""))
if kind == "InterfaceDeclaration":
version = str(attributes.get("version", ""))
interface_type = str(attributes.get("interface_type", ""))
return " ".join(part for part in (interface_type, version) if part)
if kind == "DependencyDeclaration":
return str(
attributes.get("requires_capability_type")
or attributes.get("requires_capability_id")
or attributes.get("interface_type")
or ""
)
if kind == "BindingAssertion":
return str(attributes.get("status", ""))
if kind == "RuntimeService":
return str(attributes.get("runtime_target_type") or attributes.get("service_type") or "runtime service")
if kind == "ApplicationEndpoint":
return str(attributes.get("endpoint_url") or attributes.get("domain") or "application endpoint")
if kind == "NetworkPort":
host = str(attributes.get("host") or "")
port = str(attributes.get("port") or "")
protocol = str(attributes.get("protocol") or "")
return " ".join(part for part in (host, port, protocol) if part)
if kind == "DomainName":
return str(attributes.get("domain") or "")
return ""
def _source_references(node: dict[str, Any]) -> list[dict[str, str]]:
attributes = node.get("attributes")
references: list[dict[str, str]] = []
if isinstance(attributes, dict):
source_path = str(attributes.get("source_path") or "")
if source_path:
references.append({"label": "Declaration", "path": source_path})
for source in attributes.get("source_links", []):
if isinstance(source, dict):
references.append({key: str(value) for key, value in source.items()})
for anchor in attributes.get("discovery_source_anchors", []):
if isinstance(anchor, dict):
reference = {
"label": f"Discovery {anchor.get('source_kind', 'source')}",
}
for key in ("path", "url", "ref", "json_pointer"):
if anchor.get(key):
reference[key] = str(anchor[key])
references.append(reference)
return references
def _node_links(node_id: str, attributes: dict[str, Any] | None = None) -> dict[str, str]:
links = {"registry": f"/graph/nodes/{node_id}"}
source_path = str((attributes or {}).get("source_path") or "")
if source_path:
links["sourceFile"] = source_path
return links
def _repository_links(repository: dict[str, Any]) -> dict[str, str]:
slug = str(repository.get("slug", ""))
links = {"registry": f"/repositories/{slug}"} if slug else {}
state_hub_repo_id = str(repository.get("state_hub_repo_id") or "")
if state_hub_repo_id:
links["stateHubRepo"] = f"/repos/by-id/{state_hub_repo_id}"
return links