from __future__ import annotations import ipaddress from datetime import datetime, timezone from re import sub from typing import Any from urllib.parse import urlparse from .canon import edge_canon_mapping from .deployment_overlay import graph_explorer_overlay_fields, normalize_deployment_overlay DISPLAY_STATES = ("show", "blur", "hide", "highlight", "remove") LAYER_ORDER = ( "repository", "server", "runtime_service", "application", "network", "domain", "deployment", "service", "capability", "interface", "dependency", "binding", "library", ) _KIND_LAYER = { "Repository": "repository", "Server": "server", "RuntimeService": "runtime_service", "ApplicationEndpoint": "application", "NetworkPort": "network", "DomainName": "domain", "Deployment": "deployment", "ServiceDeclaration": "service", "CapabilityDeclaration": "capability", "InterfaceDeclaration": "interface", "DependencyDeclaration": "dependency", "BindingAssertion": "binding", "Library": "library", } _LAYER_COLORS = { "repository": "#475569", "server": "#334155", "runtime_service": "#0369a1", "application": "#0f766e", "network": "#64748b", "domain": "#9333ea", "deployment": "#16a34a", "service": "#0f766e", "capability": "#2563eb", "interface": "#7c3aed", "dependency": "#b45309", "binding": "#be123c", "library": "#0891b2", } _EDGE_STRENGTH = { "built_from": "medium", "depends_on": "medium", "deploys": "strong", "evidenced_by": "medium", "exposes": "strong", "flows_to": "medium", "governed_by": "medium", "implements": "medium", "observed_by": "medium", "part_of": "weak", "reads_or_writes": "medium", "provides": "strong", "available_via": "medium", "consumes": "medium", "uses_interface": "medium", "declares": "weak", "deployed_as": "medium", "exposes_port": "strong", "listens_on": "strong", "names_endpoint": "medium", "opens_port": "strong", "routes_to_port": "medium", "routes_to_service": "strong", "resolves_to": "medium", "runs_on": "strong", "owns_deployment": "weak", } def fabric_graph_explorer_manifest(base_url: str = "") -> dict[str, Any]: """Return the host manifest for the reusable graph explorer shell.""" return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "GraphExplorerManifest", "id": "railiance-fabric.registry-map", "title": "Railiance Fabric Registry Map", "description": "Manifest for exploring registered Fabric ecosystem entities.", "engine": { "suggested_package": "graph-explorer-engine", "preferred_library": "cytoscape", "display_state_owner": "host-or-engine", }, "endpoints": { "graph": { "method": "GET", "url": f"{base_url}/exports/graph-explorer", }, "manifest": { "method": "GET", "url": f"{base_url}/exports/graph-explorer/manifest", }, }, "identity": { "node_id_field": "id", "stable_key_field": "stableKey", "edge_id_field": "id", "source_field": "source", "target_field": "target", }, "layers": [ { "id": layer, "label": layer.replace("_", " ").title(), "order": index, "color": _LAYER_COLORS[layer], } for index, layer in enumerate(LAYER_ORDER) ], "grouping_fields": [ "domain", "repo", "layer", "kind", "environment", "deploymentEnvironment", "deploymentScenario", "routingAuthority", "accessZone", "policyAuthority", "exposureClass", "serverHost", "lifecycle", "unresolved", ], "search_fields": [ "id", "stableKey", "label", "name", "description", "repo", "domain", "environment", "deploymentEnvironment", "deploymentScenario", "routingAuthority", "accessZone", "policyAuthority", "exposureClass", "serverHost", "routeHost", "routeHostname", "routePort", "routeProtocol", "kind", "layer", "edgeType", "canonicalType", "canonCategory", "evidenceState", ], "filter": { "actions": list(DISPLAY_STATES), "fields": [ {"id": "kind", "label": "Kind", "type": "string"}, {"id": "layer", "label": "Node Type", "type": "string"}, {"id": "repo", "label": "Repo", "type": "string"}, {"id": "domain", "label": "Domain", "type": "string"}, {"id": "environment", "label": "Environment", "type": "string"}, {"id": "deploymentEnvironment", "label": "Deployment Environment", "type": "string"}, {"id": "deploymentScenario", "label": "Deployment Scenario", "type": "string"}, {"id": "routingAuthority", "label": "Routing Authority", "type": "string"}, {"id": "accessZone", "label": "Access Zone", "type": "string"}, {"id": "policyAuthority", "label": "Policy Authority", "type": "string"}, {"id": "exposureClass", "label": "Exposure Class", "type": "string"}, {"id": "serverHost", "label": "Server Host", "type": "string"}, {"id": "routeHost", "label": "Route Host", "type": "string"}, {"id": "routePort", "label": "Route Port", "type": "number"}, {"id": "lifecycle", "label": "Lifecycle", "type": "string"}, {"id": "reviewState", "label": "Review State", "type": "string"}, {"id": "unresolved", "label": "Unresolved", "type": "boolean"}, {"id": "edgeType", "label": "Edge Type", "type": "string"}, {"id": "canonicalType", "label": "Canonical Edge", "type": "string"}, {"id": "canonCategory", "label": "Canon Category", "type": "string"}, {"id": "evidenceState", "label": "Evidence State", "type": "string"}, {"id": "displayOnly", "label": "Display Only", "type": "boolean"}, {"id": "strength", "label": "Strength", "type": "string"}, {"id": "sameLayer", "label": "Same Layer", "type": "boolean"}, {"id": "text", "label": "Text", "type": "string"}, ], }, "visual_encodings": { "node_color": "layer", "node_shape": "kind", "node_size": "confidence", "edge_width": "strength", "edge_style": "sameLayer", "edge_opacity": "displayState", }, "detail": { "node_fields": [ "name", "kind", "layer", "repo", "domain", "lifecycle", "deploymentEnvironment", "deploymentScenario", "routingAuthority", "accessZone", "policyAuthority", "exposureClass", "routeHost", "routePort", "canonCategory", "evidenceState", "unresolved", "description", "sourceReferences", "deepLinks", ], "edge_fields": [ "edgeType", "canonicalType", "displayOnly", "evidenceState", "strength", "source", "target", "sourceLayer", "targetLayer", "sameLayer", "deepLinks", ], }, "modes": [ { "id": "by-fabric", "label": "Fabric", "description": "Group and filter by fabric/accountability fields.", }, { "id": "by-deployment-environment", "label": "Environment", "description": "Group and filter by deployment environment.", }, { "id": "by-deployment-scenario", "label": "Scenario", "description": "Group and filter by deployment scenario.", }, { "id": "by-routing-authority", "label": "Routing", "description": "Group and filter by routing authority.", }, { "id": "by-access-zone", "label": "Access Zone", "description": "Group and filter by intended access zone.", }, { "id": "full", "label": "Full", "description": "Show the complete registered ecosystem graph.", }, { "id": "selected-path", "label": "Selected Path", "description": "Show selected nodes with predecessor and successor context.", "requires_selection": True, }, { "id": "neighborhood", "label": "Neighborhood", "description": "Show the selected node neighborhood by configurable depth.", "requires_selection": True, }, { "id": "onboarding-gaps", "label": "Onboarding Gaps", "description": "Highlight registered repos without accepted Fabric graph snapshots.", }, { "id": "unresolved", "label": "Unresolved", "description": "Highlight dependencies that have no accepted provider binding.", }, ], "profile_persistence": "local", "shareable_state": { "url_parameters": True, "profile_id": True, "state_blob": True, }, } def fabric_graph_explorer_payload( graph: dict[str, Any], repositories: list[dict[str, Any]] | None = None, snapshot_repo_slugs: set[str] | None = None, ) -> dict[str, Any]: """Project a Fabric graph export into the shared graph explorer payload.""" source_nodes = [node for node in graph.get("nodes", []) if isinstance(node, dict)] source_edges = [edge for edge in graph.get("edges", []) if isinstance(edge, dict)] repositories = repositories or [] snapshot_repo_slugs = snapshot_repo_slugs or set() source_repo_slugs = { str(node.get("repo", "")).strip() for node in source_nodes if str(node.get("repo", "")).strip() } registered_repo_slugs = { str(repo.get("slug", "")).strip() for repo in repositories if str(repo.get("slug", "")).strip() } repo_slugs = set(source_repo_slugs) repo_slugs.update(registered_repo_slugs) repo_slugs.update(snapshot_repo_slugs) repo_slugs.discard("") unresolved = _unresolved_dependency_ids(source_nodes, source_edges) elements: list[dict[str, Any]] = [] repository_index = {str(repo.get("slug", "")): repo for repo in repositories} source_repository_node_ids = { str(node.get("id", "")): f"repo:{str(node.get('repo', '')).strip()}" for node in source_nodes if str(node.get("kind", "")) == "Repository" and str(node.get("id", "")) and str(node.get("repo", "")).strip() } for slug in sorted(repo_slugs): repo = repository_index.get(slug, {}) has_snapshot = slug in snapshot_repo_slugs or slug in source_repo_slugs is_registered = slug in registered_repo_slugs repo_id = f"repo:{slug}" elements.append( { "data": { "id": repo_id, "stableKey": repo_id, "kind": "Repository", "layer": "repository", "label": str(repo.get("name") or slug), "name": str(repo.get("name") or slug), "description": ( "Registered repository with accepted Fabric graph snapshot." if has_snapshot else "Registered repository without an accepted Fabric graph snapshot." ), "repo": slug, "domain": "railiance", "lifecycle": "active" if has_snapshot else "registered-only", "reviewState": "accepted" if has_snapshot else "candidate", "freshnessState": "current" if has_snapshot else "missing", "unresolved": is_registered and not has_snapshot, "confidence": 1.0 if has_snapshot else 0.3, "visualSize": 56 if has_snapshot else 42, "ownership": "registry", "displayState": "show", "visibilitySource": "default", "visibilityReason": "default", "sourceReferences": [], "deepLinks": _repository_links(repo), }, "classes": "repository accepted" if has_snapshot else "repository candidate unresolved", } ) for node in source_nodes: node_id = str(node.get("id", "")) if not node_id: continue source_kind = str(node.get("kind", "")) if source_kind == "Repository": continue attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {} overlay_data = _overlay_data( node.get("deployment_overlay") if isinstance(node.get("deployment_overlay"), dict) else {}, attributes, ) kind = _presentation_kind(source_kind, attributes) layer = _layer_for_kind(kind) is_unresolved = node_id in unresolved review_state = str(attributes.get("discovery_review_state") or "accepted") confidence = attributes.get("discovery_confidence") elements.append( { "data": { "id": node_id, "stableKey": node_id, "kind": kind, "layer": layer, "label": str(node.get("name") or node_id), "name": str(node.get("name") or node_id), "description": _node_description(kind, attributes), "repo": str(node.get("repo", "")), "domain": str(node.get("domain", "")), "lifecycle": str(node.get("lifecycle", "")), "environment": overlay_data["deploymentEnvironment"], **overlay_data, "canonCategory": str( node.get("canon_category") or attributes.get("canon_category") or "" ), "canonAnchor": str(node.get("canon_anchor") or attributes.get("canon_anchor") or ""), "mappingFit": str(node.get("mapping_fit") or attributes.get("mapping_fit") or ""), "evidenceState": str( node.get("evidence_state") or attributes.get("evidence_state") or "" ), "reviewState": review_state, "freshnessState": "current", "unresolved": is_unresolved, "confidence": ( float(confidence) if isinstance(confidence, (int, float)) else 0.45 if is_unresolved else 1.0 ), "visualSize": 34 if layer == "binding" else 46 if is_unresolved else 50, "ownership": str(attributes.get("owner") or attributes.get("discovery_origin") or "repo"), "attributes": {**attributes, "source_kind": source_kind} if source_kind != kind else attributes, "discovery": { "stableKey": attributes.get("discovery_stable_key", ""), "origin": attributes.get("discovery_origin", ""), "reviewState": attributes.get("discovery_review_state", ""), "confidence": attributes.get("discovery_confidence", ""), "provenance": attributes.get("discovery_provenance", []), }, "displayState": "show", "visibilitySource": "default", "visibilityReason": "default", "sourceReferences": _source_references(node), "deepLinks": _node_links(node_id, attributes), }, "classes": " ".join( part for part in (layer, kind, "unresolved" if is_unresolved else review_state) if part ), } ) node_layers = _node_data_index(elements, "layer") node_repos = _node_data_index(elements, "repo") node_kinds = _node_data_index(elements, "kind") edge_index = _append_infrastructure_elements( source_nodes, elements, node_layers, node_repos, repo_slugs, ) for edge in source_edges: source = source_repository_node_ids.get(str(edge.get("from", "")), str(edge.get("from", ""))) target = source_repository_node_ids.get(str(edge.get("to", "")), str(edge.get("to", ""))) edge_type = _presentation_edge_type(str(edge.get("type", "")), source, target, node_kinds) if not source or not target: continue elements.append( _edge_element( edge_index, source, target, edge_type, node_layers, node_repos, **_edge_metadata(edge, edge_type), ) ) edge_index += 1 for slug in sorted(repo_slugs): repo_id = f"repo:{slug}" for node in source_nodes: node_id = str(node.get("id", "")) if str(node.get("kind", "")) == "Repository": continue if str(node.get("repo", "")) != slug or not node_id: continue elements.append( _edge_element( edge_index, repo_id, node_id, "declares", node_layers, node_repos, display_only=True, ) ) edge_index += 1 visible_nodes = [element for element in elements if "source" not in element["data"]] visible_edges = [element for element in elements if "source" in element["data"]] return { "apiVersion": "railiance.fabric/v1alpha1", "kind": "GraphExplorerPayload", "manifest_id": "railiance-fabric.registry-map", "generated_at": _utc_now(), "repository": {"slug": "registry", "name": "Railiance Fabric Registry"}, "mode": "full", "profile": None, "metrics": { "node_count": len(visible_nodes), "edge_count": len(visible_edges), "hidden_count": 0, "blurred_count": 0, "registered_repo_count": len(registered_repo_slugs), "repo_node_count": len(repo_slugs), "deployment_node_count": sum(1 for element in visible_nodes if element["data"].get("kind") == "Deployment"), "server_node_count": sum(1 for element in visible_nodes if element["data"].get("kind") == "Server"), "registered_only_repo_count": sum( 1 for element in visible_nodes if element["data"].get("repo") in registered_repo_slugs and element["data"].get("lifecycle") == "registered-only" ), "unresolved_count": len(unresolved), }, "filter": { "rules": [], "manual_overrides": {}, "orphaned_overrides": [], "precedence": ( "later rules override earlier rules; remove excludes matching elements " "from layout and wins over manual overrides; otherwise manual overrides win last" ), "connected_edge_behavior": ( "edges connected to hidden nodes are hidden; edges connected to removed nodes are removed" ), }, "elements": elements, "hidden_elements": [], } def _utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def _layer_for_kind(kind: str) -> str: return _KIND_LAYER.get(kind, kind.lower() or "unknown") def _presentation_kind(kind: str, attributes: dict[str, Any]) -> str: if kind != "Server": return kind host = str(attributes.get("host") or "").strip().lower() runtime_type = str(attributes.get("runtime_target_type") or attributes.get("server_type") or "") if runtime_type == "kubernetes-service-dns" or host.endswith(".svc.cluster.local"): return "RuntimeService" if runtime_type in {"declared-endpoint", "ingress-host"} and _looks_like_domain(host): return "ApplicationEndpoint" return kind def _presentation_edge_type(edge_type: str, source: str, target: str, node_kinds: dict[str, str]) -> str: if edge_type == "resolves_to": target_kind = node_kinds.get(target, "") if target_kind == "ApplicationEndpoint": return "names_endpoint" if target_kind == "RuntimeService": return "routes_to_service" if edge_type == "opens_port" and node_kinds.get(source, "") in {"ApplicationEndpoint", "RuntimeService"}: return "listens_on" return edge_type def _edge_metadata(edge: dict[str, Any], edge_type: str) -> dict[str, Any]: canon_mapping = edge_canon_mapping(edge_type) attributes = edge.get("attributes") if isinstance(edge.get("attributes"), dict) else {} return { "canonical_type": str(edge.get("canonical_type") or canon_mapping.canonical_type), "canon_anchor": str(edge.get("canon_anchor") or canon_mapping.canon_anchor), "mapping_fit": str(edge.get("mapping_fit") or canon_mapping.fit), "display_only": bool(edge.get("display_only", canon_mapping.display_only)), "evidence_state": str(edge.get("evidence_state") or "declared"), "deployment_overlay": normalize_deployment_overlay( edge.get("deployment_overlay") if isinstance(edge.get("deployment_overlay"), dict) else {}, attributes, ), } def _edge_strength(edge_type: str) -> str: if edge_type.startswith("binds:"): status = edge_type.split(":", 1)[1] if status in {"missing", "disputed"}: return "weak" if status in {"accepted", "exact"}: return "strong" return "medium" return _EDGE_STRENGTH.get(edge_type, "medium") def _edge_width(strength: str) -> int: return {"weak": 1, "medium": 3, "strong": 5}.get(strength, 2) def _node_data_index(elements: list[dict[str, Any]], field: str) -> dict[str, str]: index: dict[str, str] = {} for element in elements: data = element.get("data", {}) if isinstance(data, dict) and "source" not in data: node_id = str(data.get("id") or "") if node_id: index[node_id] = str(data.get(field) or "") return index def _runtime_node_indexes(source_nodes: list[dict[str, Any]]) -> tuple[dict[str, str], dict[str, str]]: servers_by_host: dict[str, str] = {} ports_by_endpoint: dict[str, str] = {} for node in source_nodes: node_id = str(node.get("id") or "") if not node_id: continue attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {} kind = _presentation_kind(str(node.get("kind") or ""), attributes) host = _normalize_endpoint_host(str(attributes.get("host") or "")) if kind == "Server" and host: servers_by_host.setdefault(host, node_id) if kind == "NetworkPort" and host: port = _int_value(attributes.get("port")) protocol = _normalize_protocol(str(attributes.get("protocol") or "tcp")) if port is not None: ports_by_endpoint.setdefault(_endpoint_key(host, port, protocol), node_id) return servers_by_host, ports_by_endpoint def _append_infrastructure_elements( source_nodes: list[dict[str, Any]], elements: list[dict[str, Any]], node_layers: dict[str, str], node_repos: dict[str, str], repo_slugs: set[str], ) -> int: edge_index = 0 endpoints_by_service = _endpoints_by_service(source_nodes) server_ids_by_host, port_ids_by_endpoint = _runtime_node_indexes(source_nodes) generated_edge_keys: set[tuple[str, str, str]] = set() def append_edge(source: str, target: str, edge_type: str, *, display_only: bool = True) -> None: nonlocal edge_index if not source or not target: return key = (source, edge_type, target) if key in generated_edge_keys: return generated_edge_keys.add(key) elements.append( _edge_element( edge_index, source, target, edge_type, node_layers, node_repos, display_only=display_only, ) ) edge_index += 1 service_nodes = sorted( (node for node in source_nodes if node.get("kind") == "ServiceDeclaration"), key=lambda node: str(node.get("id", "")), ) for service in service_nodes: service_id = str(service.get("id", "")) if not service_id: continue attributes = service.get("attributes") if isinstance(service.get("attributes"), dict) else {} environments = _environments(attributes) repo = str(service.get("repo") or "") service_name = str(service.get("name") or service_id) service_endpoints = endpoints_by_service.get(service_id, []) for environment in environments: deployment_id = f"deployment:{service_id}:{environment}" matching_endpoints = [ endpoint for endpoint in service_endpoints if _environment_matches(environment, endpoint["environments"]) ] deployment_overlay = normalize_deployment_overlay( attributes, {"deployment_environment": environment}, *[endpoint["deployment_overlay"] for endpoint in matching_endpoints], ) overlay_data = _overlay_data(deployment_overlay) server_hosts = sorted({endpoint["host"] for endpoint in matching_endpoints}) deployment_data = { "id": deployment_id, "stableKey": deployment_id, "kind": "Deployment", "layer": "deployment", "label": f"{service_name} [{environment}]", "name": f"{service_name} deployment ({environment})", "description": f"{service_name} deployment in {environment}.", "repo": repo, "domain": str(service.get("domain") or ""), "lifecycle": str(service.get("lifecycle") or ""), "environment": environment, **overlay_data, "serviceId": service_id, "serverHosts": server_hosts, "reviewState": "accepted", "freshnessState": "current", "unresolved": False, "confidence": 0.85 if server_hosts else 0.65, "visualSize": 42, "ownership": str(attributes.get("owner") or "repo"), "attributes": { "service_id": service_id, "environment": environment, "server_hosts": server_hosts, "source_service": service_id, }, "displayState": "show", "visibilitySource": "default", "visibilityReason": "default", "sourceReferences": _source_references(service), "deepLinks": {"service": f"/graph/nodes/{service_id}"}, } elements.append({"data": deployment_data, "classes": "deployment accepted"}) node_layers[deployment_id] = "deployment" node_repos[deployment_id] = repo append_edge(service_id, deployment_id, "deployed_as") if repo and repo in repo_slugs: repo_id = f"repo:{repo}" append_edge(repo_id, deployment_id, "owns_deployment") for endpoint in matching_endpoints: host = endpoint["host"] port = endpoint["port"] protocol = endpoint["protocol"] endpoint_overlay_data = _overlay_data( endpoint["deployment_overlay"], {"deployment_environment": environment}, ) server_id = server_ids_by_host.get(host) endpoint_key = _endpoint_key(host, port, protocol) port_id = port_ids_by_endpoint.get(endpoint_key) port_was_generated = port_id is None if server_id is None and _looks_like_machine_address(host): server_id = _server_id(host) server_ids_by_host[host] = server_id server_data = { "id": server_id, "stableKey": server_id, "kind": "Server", "layer": "server", "label": host, "name": host, "description": f"Server inferred from endpoint {endpoint['url']}.", "repo": "", "domain": str(service.get("domain") or ""), "lifecycle": "active", "environment": environment, **endpoint_overlay_data, "serverHost": host, "reviewState": "accepted", "freshnessState": "current", "unresolved": False, "confidence": 0.7, "visualSize": 48, "ownership": "inferred", "attributes": { "host": host, "source_interface_id": endpoint["interface_id"], "endpoint_url": endpoint["url"], "deployment_overlay": endpoint_overlay_data["deploymentOverlay"], }, "displayState": "show", "visibilitySource": "default", "visibilityReason": "default", "sourceReferences": [{"label": "Endpoint interface", "ref": endpoint["interface_id"]}], "deepLinks": {"interface": f"/graph/nodes/{endpoint['interface_id']}"}, } elements.append({"data": server_data, "classes": "server accepted inferred"}) node_layers[server_id] = "server" node_repos[server_id] = "" if port_id is None: port_id = _port_id(host, port, protocol) port_ids_by_endpoint[endpoint_key] = port_id port_data = { "id": port_id, "stableKey": port_id, "kind": "NetworkPort", "layer": "network", "label": f"{host}:{port}/{protocol}", "name": f"{host}:{port}/{protocol}", "description": f"Port inferred from endpoint {endpoint['url']}.", "repo": "", "domain": str(service.get("domain") or ""), "lifecycle": "active", "environment": environment, **endpoint_overlay_data, "serverHost": host, "reviewState": "accepted", "freshnessState": "current", "unresolved": False, "confidence": 0.7, "visualSize": 42, "ownership": "inferred", "attributes": { "host": host, "port": port, "protocol": protocol, "source_interface_id": endpoint["interface_id"], "endpoint_url": endpoint["url"], "deployment_overlay": endpoint_overlay_data["deploymentOverlay"], }, "displayState": "show", "visibilitySource": "default", "visibilityReason": "default", "sourceReferences": [{"label": "Endpoint interface", "ref": endpoint["interface_id"]}], "deepLinks": {"interface": f"/graph/nodes/{endpoint['interface_id']}"}, } elements.append({"data": port_data, "classes": "network accepted inferred"}) node_layers[port_id] = "network" node_repos[port_id] = "" if server_id: append_edge(deployment_id, server_id, "runs_on") if port_was_generated: append_edge(server_id, port_id, "opens_port") append_edge(deployment_id, port_id, "exposes_port") return edge_index def _endpoints_by_service(source_nodes: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: endpoints: dict[str, list[dict[str, Any]]] = {} for node in source_nodes: if node.get("kind") != "InterfaceDeclaration": continue attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {} endpoint = attributes.get("endpoint") if isinstance(attributes.get("endpoint"), dict) else {} url = str(endpoint.get("url") or "").strip() parsed = _parse_endpoint_url(url) service_id = str(attributes.get("service_id") or "") if not service_id or not parsed: continue host, port, protocol = parsed endpoints.setdefault(service_id, []).append( { "host": host, "port": port, "protocol": protocol, "url": url, "interface_id": str(node.get("id") or ""), "environments": _environments(attributes), "deployment_overlay": normalize_deployment_overlay( attributes, endpoint, { "host": host, "port": port, "protocol": protocol, "route": url, }, ), } ) return endpoints def _environments(attributes: dict[str, Any]) -> list[str]: environments = [ str(environment) for environment in attributes.get("environments", []) if str(environment).strip() ] return environments or ["all"] def _environment_matches(deployment_environment: str, endpoint_environments: list[str]) -> bool: return ( deployment_environment == "all" or "all" in endpoint_environments or deployment_environment in endpoint_environments ) def _endpoint_host(url: str) -> str: parsed = _parse_endpoint_url(url) return parsed[0] if parsed else "" def _parse_endpoint_url(url: str) -> tuple[str, int, str] | None: if not url: return None parsed = urlparse(url) host = _normalize_endpoint_host(parsed.hostname or parsed.netloc or parsed.path.split("/", 1)[0]) try: port = parsed.port except ValueError: port = None scheme = str(parsed.scheme or "").lower() port = port or {"http": 80, "https": 443, "postgres": 5432}.get(scheme) if not host or port is None: return None return host, port, "tcp" def _normalize_endpoint_host(host: str) -> str: value = str(host or "").strip().lower().strip("[]") if value in {"0.0.0.0", "::"}: return "localhost" return value def _endpoint_key(host: str, port: int, protocol: str) -> str: return f"{_normalize_endpoint_host(host)}:{port}/{_normalize_protocol(protocol)}" def _normalize_protocol(protocol: str) -> str: return str(protocol or "tcp").strip().lower() or "tcp" def _int_value(value: object) -> int | None: if isinstance(value, bool): return None if isinstance(value, int): return value text = str(value or "").strip() if text.isdecimal(): return int(text) return None def _looks_like_domain(host: str) -> bool: value = _normalize_endpoint_host(host) if not value or value == "localhost": return False if all(part.isdecimal() for part in value.split(".") if part): return False return "." in value def _looks_like_machine_address(host: str) -> bool: value = _normalize_endpoint_host(host) if value in {"localhost", "127.0.0.1"}: return True try: ipaddress.ip_address(value) except ValueError: return False return True def _server_id(host: str) -> str: key = sub(r"[^A-Za-z0-9._:+-]+", "-", host.lower()).strip("-") return f"server:{key or 'unknown'}" def _port_id(host: str, port: int, protocol: str) -> str: key = sub(r"[^A-Za-z0-9._:+-]+", "-", _endpoint_key(host, port, protocol)).strip("-") return f"port:{key or 'unknown'}" def _edge_element( edge_index: int, source: str, target: str, edge_type: str, node_layers: dict[str, str], node_repos: dict[str, str], *, canonical_type: str = "", canon_anchor: str = "", mapping_fit: str = "", display_only: bool = False, evidence_state: str = "", deployment_overlay: dict[str, Any] | None = None, ) -> dict[str, Any]: source_layer = node_layers.get(source, "unknown") target_layer = node_layers.get(target, "unknown") source_repo = node_repos.get(source, "") target_repo = node_repos.get(target, "") same_repo = bool(source_repo and source_repo == target_repo) canon_mapping = edge_canon_mapping(edge_type) canonical_type = canonical_type or canon_mapping.canonical_type canon_anchor = canon_anchor or canon_mapping.canon_anchor mapping_fit = mapping_fit or canon_mapping.fit display_only = display_only or canon_mapping.display_only evidence_state = evidence_state or "declared" strength = _edge_strength(edge_type) layout = _layout_hints(edge_type, source_layer, target_layer, same_repo) edge_id = f"edge:{edge_index}:{source}:{edge_type}:{target}" overlay_data = _overlay_data(deployment_overlay or {}) return { "data": { "id": edge_id, "stableKey": edge_id, "kind": "edge", "layer": "relationship", "label": edge_type, "source": source, "target": target, "sourceLayer": source_layer, "targetLayer": target_layer, "sourceRepo": source_repo, "targetRepo": target_repo, "edgeType": edge_type, "dependencyType": edge_type, "canonicalType": canonical_type, "canonAnchor": canon_anchor, "mappingFit": mapping_fit, "displayOnly": display_only, "evidenceState": evidence_state, **overlay_data, "strength": strength, "edgeWidth": _edge_width(strength), "sameLayer": source_layer == target_layer, "sameRepo": same_repo, "layoutAffinity": layout["affinity"], "layoutIdealLength": layout["ideal_length"], "layoutElasticity": layout["elasticity"], "reviewState": "accepted", "freshnessState": "current", "displayState": "show", "visibilitySource": "default", "visibilityReason": "default", "deepLinks": {}, }, "classes": " ".join( part for part in ( edge_type.replace(":", "-"), strength, str(layout["affinity"]), "display-only" if display_only else "canonical", "same-layer" if source_layer == target_layer else "", "same-repo" if same_repo else "", ) if part ), } def _overlay_data(*sources: object) -> dict[str, Any]: return graph_explorer_overlay_fields(normalize_deployment_overlay(*sources)) def _layout_hints( edge_type: str, source_layer: str, target_layer: str, same_repo: bool, ) -> dict[str, int | str]: if edge_type == "runs_on": return {"affinity": "deployment-server", "ideal_length": 42, "elasticity": 240} if edge_type == "deployed_as": return {"affinity": "service-deployment", "ideal_length": 58, "elasticity": 180} if edge_type == "owns_deployment": return {"affinity": "repo-deployment", "ideal_length": 150, "elasticity": 28} if edge_type == "declares": return {"affinity": "repo-declaration", "ideal_length": 88, "elasticity": 110} if same_repo: return {"affinity": "same-repo", "ideal_length": 72, "elasticity": 150} if "repository" in {source_layer, target_layer}: return {"affinity": "repo-loose", "ideal_length": 130, "elasticity": 45} return {"affinity": "cross-repo", "ideal_length": 190, "elasticity": 22} def _unresolved_dependency_ids( nodes: list[dict[str, Any]], edges: list[dict[str, Any]], ) -> set[str]: dependency_ids = { str(node.get("id", "")) for node in nodes if node.get("kind") == "DependencyDeclaration" and str(node.get("id", "")) } resolved: set[str] = set() unresolved: set[str] = set() for edge in edges: edge_type = str(edge.get("type", "")) source = str(edge.get("from", "")) if source not in dependency_ids or not edge_type.startswith("binds:"): continue status = edge_type.split(":", 1)[1] if status in {"accepted", "candidate", "exact", "compatible"}: resolved.add(source) if status in {"missing", "disputed"}: unresolved.add(source) return (dependency_ids - resolved) | unresolved def _node_description(kind: str, attributes: object) -> str: if not isinstance(attributes, dict): return "" description = str(attributes.get("description", "")) if description: return description if kind == "CapabilityDeclaration": return str(attributes.get("capability_type", "")) if kind == "InterfaceDeclaration": version = str(attributes.get("version", "")) interface_type = str(attributes.get("interface_type", "")) return " ".join(part for part in (interface_type, version) if part) if kind == "DependencyDeclaration": return str( attributes.get("requires_capability_type") or attributes.get("requires_capability_id") or attributes.get("interface_type") or "" ) if kind == "BindingAssertion": return str(attributes.get("status", "")) if kind == "RuntimeService": return str(attributes.get("runtime_target_type") or attributes.get("service_type") or "runtime service") if kind == "ApplicationEndpoint": return str(attributes.get("endpoint_url") or attributes.get("domain") or "application endpoint") if kind == "NetworkPort": host = str(attributes.get("host") or "") port = str(attributes.get("port") or "") protocol = str(attributes.get("protocol") or "") return " ".join(part for part in (host, port, protocol) if part) if kind == "DomainName": return str(attributes.get("domain") or "") return "" def _source_references(node: dict[str, Any]) -> list[dict[str, str]]: attributes = node.get("attributes") references: list[dict[str, str]] = [] if isinstance(attributes, dict): source_path = str(attributes.get("source_path") or "") if source_path: references.append({"label": "Declaration", "path": source_path}) for source in attributes.get("source_links", []): if isinstance(source, dict): references.append({key: str(value) for key, value in source.items()}) for anchor in attributes.get("discovery_source_anchors", []): if isinstance(anchor, dict): reference = { "label": f"Discovery {anchor.get('source_kind', 'source')}", } for key in ("path", "url", "ref", "json_pointer"): if anchor.get(key): reference[key] = str(anchor[key]) references.append(reference) return references def _node_links(node_id: str, attributes: dict[str, Any] | None = None) -> dict[str, str]: links = {"registry": f"/graph/nodes/{node_id}"} source_path = str((attributes or {}).get("source_path") or "") if source_path: links["sourceFile"] = source_path return links def _repository_links(repository: dict[str, Any]) -> dict[str, str]: slug = str(repository.get("slug", "")) links = {"registry": f"/repositories/{slug}"} if slug else {} state_hub_repo_id = str(repository.get("state_hub_repo_id") or "") if state_hub_repo_id: links["stateHubRepo"] = f"/repos/by-id/{state_hub_repo_id}" return links