generated from coulomb/repo-seed
Refine runtime entity taxonomy
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
from datetime import datetime, timezone
|
||||
from re import sub
|
||||
from typing import Any
|
||||
@@ -10,6 +11,10 @@ DISPLAY_STATES = ("show", "blur", "hide", "highlight", "remove")
|
||||
LAYER_ORDER = (
|
||||
"repository",
|
||||
"server",
|
||||
"runtime_service",
|
||||
"application",
|
||||
"network",
|
||||
"domain",
|
||||
"deployment",
|
||||
"service",
|
||||
"capability",
|
||||
@@ -22,6 +27,10 @@ LAYER_ORDER = (
|
||||
_KIND_LAYER = {
|
||||
"Repository": "repository",
|
||||
"Server": "server",
|
||||
"RuntimeService": "runtime_service",
|
||||
"ApplicationEndpoint": "application",
|
||||
"NetworkPort": "network",
|
||||
"DomainName": "domain",
|
||||
"Deployment": "deployment",
|
||||
"ServiceDeclaration": "service",
|
||||
"CapabilityDeclaration": "capability",
|
||||
@@ -34,6 +43,10 @@ _KIND_LAYER = {
|
||||
_LAYER_COLORS = {
|
||||
"repository": "#475569",
|
||||
"server": "#334155",
|
||||
"runtime_service": "#0369a1",
|
||||
"application": "#0f766e",
|
||||
"network": "#64748b",
|
||||
"domain": "#9333ea",
|
||||
"deployment": "#16a34a",
|
||||
"service": "#0f766e",
|
||||
"capability": "#2563eb",
|
||||
@@ -51,6 +64,13 @@ _EDGE_STRENGTH = {
|
||||
"uses_interface": "medium",
|
||||
"declares": "weak",
|
||||
"deployed_as": "medium",
|
||||
"exposes_port": "strong",
|
||||
"listens_on": "strong",
|
||||
"names_endpoint": "medium",
|
||||
"opens_port": "strong",
|
||||
"routes_to_port": "medium",
|
||||
"routes_to_service": "strong",
|
||||
"resolves_to": "medium",
|
||||
"runs_on": "strong",
|
||||
"owns_deployment": "weak",
|
||||
}
|
||||
@@ -287,12 +307,13 @@ def fabric_graph_explorer_payload(
|
||||
node_id = str(node.get("id", ""))
|
||||
if not node_id:
|
||||
continue
|
||||
kind = str(node.get("kind", ""))
|
||||
if kind == "Repository":
|
||||
source_kind = str(node.get("kind", ""))
|
||||
if source_kind == "Repository":
|
||||
continue
|
||||
attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {}
|
||||
kind = _presentation_kind(source_kind, attributes)
|
||||
layer = _layer_for_kind(kind)
|
||||
is_unresolved = node_id in unresolved
|
||||
attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {}
|
||||
review_state = str(attributes.get("discovery_review_state") or "accepted")
|
||||
confidence = attributes.get("discovery_confidence")
|
||||
elements.append(
|
||||
@@ -318,7 +339,7 @@ def fabric_graph_explorer_payload(
|
||||
),
|
||||
"visualSize": 34 if layer == "binding" else 46 if is_unresolved else 50,
|
||||
"ownership": str(attributes.get("owner") or attributes.get("discovery_origin") or "repo"),
|
||||
"attributes": attributes,
|
||||
"attributes": {**attributes, "source_kind": source_kind} if source_kind != kind else attributes,
|
||||
"discovery": {
|
||||
"stableKey": attributes.get("discovery_stable_key", ""),
|
||||
"origin": attributes.get("discovery_origin", ""),
|
||||
@@ -342,6 +363,7 @@ def fabric_graph_explorer_payload(
|
||||
|
||||
node_layers = _node_data_index(elements, "layer")
|
||||
node_repos = _node_data_index(elements, "repo")
|
||||
node_kinds = _node_data_index(elements, "kind")
|
||||
edge_index = _append_infrastructure_elements(
|
||||
source_nodes,
|
||||
elements,
|
||||
@@ -353,7 +375,7 @@ def fabric_graph_explorer_payload(
|
||||
for edge in source_edges:
|
||||
source = source_repository_node_ids.get(str(edge.get("from", "")), str(edge.get("from", "")))
|
||||
target = source_repository_node_ids.get(str(edge.get("to", "")), str(edge.get("to", "")))
|
||||
edge_type = str(edge.get("type", ""))
|
||||
edge_type = _presentation_edge_type(str(edge.get("type", "")), source, target, node_kinds)
|
||||
if not source or not target:
|
||||
continue
|
||||
elements.append(_edge_element(edge_index, source, target, edge_type, node_layers, node_repos))
|
||||
@@ -422,6 +444,30 @@ def _layer_for_kind(kind: str) -> str:
|
||||
return _KIND_LAYER.get(kind, kind.lower() or "unknown")
|
||||
|
||||
|
||||
def _presentation_kind(kind: str, attributes: dict[str, Any]) -> str:
|
||||
if kind != "Server":
|
||||
return kind
|
||||
host = str(attributes.get("host") or "").strip().lower()
|
||||
runtime_type = str(attributes.get("runtime_target_type") or attributes.get("server_type") or "")
|
||||
if runtime_type == "kubernetes-service-dns" or host.endswith(".svc.cluster.local"):
|
||||
return "RuntimeService"
|
||||
if runtime_type in {"declared-endpoint", "ingress-host"} and _looks_like_domain(host):
|
||||
return "ApplicationEndpoint"
|
||||
return kind
|
||||
|
||||
|
||||
def _presentation_edge_type(edge_type: str, source: str, target: str, node_kinds: dict[str, str]) -> str:
|
||||
if edge_type == "resolves_to":
|
||||
target_kind = node_kinds.get(target, "")
|
||||
if target_kind == "ApplicationEndpoint":
|
||||
return "names_endpoint"
|
||||
if target_kind == "RuntimeService":
|
||||
return "routes_to_service"
|
||||
if edge_type == "opens_port" and node_kinds.get(source, "") in {"ApplicationEndpoint", "RuntimeService"}:
|
||||
return "listens_on"
|
||||
return edge_type
|
||||
|
||||
|
||||
def _edge_strength(edge_type: str) -> str:
|
||||
if edge_type.startswith("binds:"):
|
||||
status = edge_type.split(":", 1)[1]
|
||||
@@ -448,6 +494,26 @@ def _node_data_index(elements: list[dict[str, Any]], field: str) -> dict[str, st
|
||||
return index
|
||||
|
||||
|
||||
def _runtime_node_indexes(source_nodes: list[dict[str, Any]]) -> tuple[dict[str, str], dict[str, str]]:
|
||||
servers_by_host: dict[str, str] = {}
|
||||
ports_by_endpoint: dict[str, str] = {}
|
||||
for node in source_nodes:
|
||||
node_id = str(node.get("id") or "")
|
||||
if not node_id:
|
||||
continue
|
||||
attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {}
|
||||
kind = _presentation_kind(str(node.get("kind") or ""), attributes)
|
||||
host = _normalize_endpoint_host(str(attributes.get("host") or ""))
|
||||
if kind == "Server" and host:
|
||||
servers_by_host.setdefault(host, node_id)
|
||||
if kind == "NetworkPort" and host:
|
||||
port = _int_value(attributes.get("port"))
|
||||
protocol = _normalize_protocol(str(attributes.get("protocol") or "tcp"))
|
||||
if port is not None:
|
||||
ports_by_endpoint.setdefault(_endpoint_key(host, port, protocol), node_id)
|
||||
return servers_by_host, ports_by_endpoint
|
||||
|
||||
|
||||
def _append_infrastructure_elements(
|
||||
source_nodes: list[dict[str, Any]],
|
||||
elements: list[dict[str, Any]],
|
||||
@@ -457,7 +523,19 @@ def _append_infrastructure_elements(
|
||||
) -> int:
|
||||
edge_index = 0
|
||||
endpoints_by_service = _endpoints_by_service(source_nodes)
|
||||
server_ids_by_host: dict[str, str] = {}
|
||||
server_ids_by_host, port_ids_by_endpoint = _runtime_node_indexes(source_nodes)
|
||||
generated_edge_keys: set[tuple[str, str, str]] = set()
|
||||
|
||||
def append_edge(source: str, target: str, edge_type: str) -> None:
|
||||
nonlocal edge_index
|
||||
if not source or not target:
|
||||
return
|
||||
key = (source, edge_type, target)
|
||||
if key in generated_edge_keys:
|
||||
return
|
||||
generated_edge_keys.add(key)
|
||||
elements.append(_edge_element(edge_index, source, target, edge_type, node_layers, node_repos))
|
||||
edge_index += 1
|
||||
|
||||
service_nodes = sorted(
|
||||
(node for node in source_nodes if node.get("kind") == "ServiceDeclaration"),
|
||||
@@ -516,31 +594,35 @@ def _append_infrastructure_elements(
|
||||
node_layers[deployment_id] = "deployment"
|
||||
node_repos[deployment_id] = repo
|
||||
|
||||
elements.append(_edge_element(edge_index, service_id, deployment_id, "deployed_as", node_layers, node_repos))
|
||||
edge_index += 1
|
||||
append_edge(service_id, deployment_id, "deployed_as")
|
||||
if repo and repo in repo_slugs:
|
||||
repo_id = f"repo:{repo}"
|
||||
elements.append(_edge_element(edge_index, repo_id, deployment_id, "owns_deployment", node_layers, node_repos))
|
||||
edge_index += 1
|
||||
append_edge(repo_id, deployment_id, "owns_deployment")
|
||||
|
||||
for endpoint in matching_endpoints:
|
||||
server_id = server_ids_by_host.get(endpoint["host"])
|
||||
if server_id is None:
|
||||
server_id = _server_id(endpoint["host"])
|
||||
server_ids_by_host[endpoint["host"]] = server_id
|
||||
host = endpoint["host"]
|
||||
port = endpoint["port"]
|
||||
protocol = endpoint["protocol"]
|
||||
server_id = server_ids_by_host.get(host)
|
||||
endpoint_key = _endpoint_key(host, port, protocol)
|
||||
port_id = port_ids_by_endpoint.get(endpoint_key)
|
||||
port_was_generated = port_id is None
|
||||
if server_id is None and _looks_like_machine_address(host):
|
||||
server_id = _server_id(host)
|
||||
server_ids_by_host[host] = server_id
|
||||
server_data = {
|
||||
"id": server_id,
|
||||
"stableKey": server_id,
|
||||
"kind": "Server",
|
||||
"layer": "server",
|
||||
"label": endpoint["host"],
|
||||
"name": endpoint["host"],
|
||||
"label": host,
|
||||
"name": host,
|
||||
"description": f"Server inferred from endpoint {endpoint['url']}.",
|
||||
"repo": "",
|
||||
"domain": str(service.get("domain") or ""),
|
||||
"lifecycle": "active",
|
||||
"environment": environment,
|
||||
"serverHost": endpoint["host"],
|
||||
"serverHost": host,
|
||||
"reviewState": "accepted",
|
||||
"freshnessState": "current",
|
||||
"unresolved": False,
|
||||
@@ -548,7 +630,7 @@ def _append_infrastructure_elements(
|
||||
"visualSize": 48,
|
||||
"ownership": "inferred",
|
||||
"attributes": {
|
||||
"host": endpoint["host"],
|
||||
"host": host,
|
||||
"source_interface_id": endpoint["interface_id"],
|
||||
"endpoint_url": endpoint["url"],
|
||||
},
|
||||
@@ -561,8 +643,49 @@ def _append_infrastructure_elements(
|
||||
elements.append({"data": server_data, "classes": "server accepted inferred"})
|
||||
node_layers[server_id] = "server"
|
||||
node_repos[server_id] = ""
|
||||
elements.append(_edge_element(edge_index, deployment_id, server_id, "runs_on", node_layers, node_repos))
|
||||
edge_index += 1
|
||||
if port_id is None:
|
||||
port_id = _port_id(host, port, protocol)
|
||||
port_ids_by_endpoint[endpoint_key] = port_id
|
||||
port_data = {
|
||||
"id": port_id,
|
||||
"stableKey": port_id,
|
||||
"kind": "NetworkPort",
|
||||
"layer": "network",
|
||||
"label": f"{host}:{port}/{protocol}",
|
||||
"name": f"{host}:{port}/{protocol}",
|
||||
"description": f"Port inferred from endpoint {endpoint['url']}.",
|
||||
"repo": "",
|
||||
"domain": str(service.get("domain") or ""),
|
||||
"lifecycle": "active",
|
||||
"environment": environment,
|
||||
"serverHost": host,
|
||||
"reviewState": "accepted",
|
||||
"freshnessState": "current",
|
||||
"unresolved": False,
|
||||
"confidence": 0.7,
|
||||
"visualSize": 42,
|
||||
"ownership": "inferred",
|
||||
"attributes": {
|
||||
"host": host,
|
||||
"port": port,
|
||||
"protocol": protocol,
|
||||
"source_interface_id": endpoint["interface_id"],
|
||||
"endpoint_url": endpoint["url"],
|
||||
},
|
||||
"displayState": "show",
|
||||
"visibilitySource": "default",
|
||||
"visibilityReason": "default",
|
||||
"sourceReferences": [{"label": "Endpoint interface", "ref": endpoint["interface_id"]}],
|
||||
"deepLinks": {"interface": f"/graph/nodes/{endpoint['interface_id']}"},
|
||||
}
|
||||
elements.append({"data": port_data, "classes": "network accepted inferred"})
|
||||
node_layers[port_id] = "network"
|
||||
node_repos[port_id] = ""
|
||||
if server_id:
|
||||
append_edge(deployment_id, server_id, "runs_on")
|
||||
if port_was_generated:
|
||||
append_edge(server_id, port_id, "opens_port")
|
||||
append_edge(deployment_id, port_id, "exposes_port")
|
||||
return edge_index
|
||||
|
||||
|
||||
@@ -574,13 +697,16 @@ def _endpoints_by_service(source_nodes: list[dict[str, Any]]) -> dict[str, list[
|
||||
attributes = node.get("attributes") if isinstance(node.get("attributes"), dict) else {}
|
||||
endpoint = attributes.get("endpoint") if isinstance(attributes.get("endpoint"), dict) else {}
|
||||
url = str(endpoint.get("url") or "").strip()
|
||||
host = _endpoint_host(url)
|
||||
parsed = _parse_endpoint_url(url)
|
||||
service_id = str(attributes.get("service_id") or "")
|
||||
if not service_id or not host:
|
||||
if not service_id or not parsed:
|
||||
continue
|
||||
host, port, protocol = parsed
|
||||
endpoints.setdefault(service_id, []).append(
|
||||
{
|
||||
"host": host,
|
||||
"port": port,
|
||||
"protocol": protocol,
|
||||
"url": url,
|
||||
"interface_id": str(node.get("id") or ""),
|
||||
"environments": _environments(attributes),
|
||||
@@ -607,11 +733,70 @@ def _environment_matches(deployment_environment: str, endpoint_environments: lis
|
||||
|
||||
|
||||
def _endpoint_host(url: str) -> str:
|
||||
parsed = _parse_endpoint_url(url)
|
||||
return parsed[0] if parsed else ""
|
||||
|
||||
|
||||
def _parse_endpoint_url(url: str) -> tuple[str, int, str] | None:
|
||||
if not url:
|
||||
return ""
|
||||
return None
|
||||
parsed = urlparse(url)
|
||||
host = parsed.netloc or parsed.path.split("/", 1)[0]
|
||||
return host.strip().lower()
|
||||
host = _normalize_endpoint_host(parsed.hostname or parsed.netloc or parsed.path.split("/", 1)[0])
|
||||
try:
|
||||
port = parsed.port
|
||||
except ValueError:
|
||||
port = None
|
||||
scheme = str(parsed.scheme or "").lower()
|
||||
port = port or {"http": 80, "https": 443, "postgres": 5432}.get(scheme)
|
||||
if not host or port is None:
|
||||
return None
|
||||
return host, port, "tcp"
|
||||
|
||||
|
||||
def _normalize_endpoint_host(host: str) -> str:
|
||||
value = str(host or "").strip().lower().strip("[]")
|
||||
if value in {"0.0.0.0", "::"}:
|
||||
return "localhost"
|
||||
return value
|
||||
|
||||
|
||||
def _endpoint_key(host: str, port: int, protocol: str) -> str:
|
||||
return f"{_normalize_endpoint_host(host)}:{port}/{_normalize_protocol(protocol)}"
|
||||
|
||||
|
||||
def _normalize_protocol(protocol: str) -> str:
|
||||
return str(protocol or "tcp").strip().lower() or "tcp"
|
||||
|
||||
|
||||
def _int_value(value: object) -> int | None:
|
||||
if isinstance(value, bool):
|
||||
return None
|
||||
if isinstance(value, int):
|
||||
return value
|
||||
text = str(value or "").strip()
|
||||
if text.isdecimal():
|
||||
return int(text)
|
||||
return None
|
||||
|
||||
|
||||
def _looks_like_domain(host: str) -> bool:
|
||||
value = _normalize_endpoint_host(host)
|
||||
if not value or value == "localhost":
|
||||
return False
|
||||
if all(part.isdecimal() for part in value.split(".") if part):
|
||||
return False
|
||||
return "." in value
|
||||
|
||||
|
||||
def _looks_like_machine_address(host: str) -> bool:
|
||||
value = _normalize_endpoint_host(host)
|
||||
if value in {"localhost", "127.0.0.1"}:
|
||||
return True
|
||||
try:
|
||||
ipaddress.ip_address(value)
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _server_id(host: str) -> str:
|
||||
@@ -619,6 +804,11 @@ def _server_id(host: str) -> str:
|
||||
return f"server:{key or 'unknown'}"
|
||||
|
||||
|
||||
def _port_id(host: str, port: int, protocol: str) -> str:
|
||||
key = sub(r"[^A-Za-z0-9._:+-]+", "-", _endpoint_key(host, port, protocol)).strip("-")
|
||||
return f"port:{key or 'unknown'}"
|
||||
|
||||
|
||||
def _edge_element(
|
||||
edge_index: int,
|
||||
source: str,
|
||||
@@ -744,6 +934,17 @@ def _node_description(kind: str, attributes: object) -> str:
|
||||
)
|
||||
if kind == "BindingAssertion":
|
||||
return str(attributes.get("status", ""))
|
||||
if kind == "RuntimeService":
|
||||
return str(attributes.get("runtime_target_type") or attributes.get("service_type") or "runtime service")
|
||||
if kind == "ApplicationEndpoint":
|
||||
return str(attributes.get("endpoint_url") or attributes.get("domain") or "application endpoint")
|
||||
if kind == "NetworkPort":
|
||||
host = str(attributes.get("host") or "")
|
||||
port = str(attributes.get("port") or "")
|
||||
protocol = str(attributes.get("protocol") or "")
|
||||
return " ".join(part for part in (host, port, protocol) if part)
|
||||
if kind == "DomainName":
|
||||
return str(attributes.get("domain") or "")
|
||||
return ""
|
||||
|
||||
|
||||
|
||||
@@ -38,9 +38,12 @@ PATH_SCOPED_NODE_KINDS = {
|
||||
}
|
||||
EVIDENCE_AGGREGATE_EDGE_TYPES = {
|
||||
"exposes_port",
|
||||
"listens_on",
|
||||
"names_endpoint",
|
||||
"opens_port",
|
||||
"resolves_to",
|
||||
"routes_to_port",
|
||||
"routes_to_service",
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import ipaddress
|
||||
import re
|
||||
import subprocess
|
||||
import tomllib
|
||||
@@ -1118,10 +1119,11 @@ def _add_runtime_endpoint(
|
||||
if not host or port_number is None:
|
||||
return ""
|
||||
|
||||
server_key = discovery_stable_key(context.repo_slug, "Server", host)
|
||||
target_kind = _runtime_target_kind(host, server_type)
|
||||
target_key = discovery_stable_key(context.repo_slug, target_kind, host)
|
||||
context.accumulator.add_node(
|
||||
stable_key=server_key,
|
||||
kind="Server",
|
||||
stable_key=target_key,
|
||||
kind=target_kind,
|
||||
label=host,
|
||||
replacement_scope=scope,
|
||||
provenance=provenance,
|
||||
@@ -1129,7 +1131,7 @@ def _add_runtime_endpoint(
|
||||
aliases=[host],
|
||||
attributes={
|
||||
"host": host,
|
||||
"server_type": server_type,
|
||||
"runtime_target_type": server_type,
|
||||
**(attributes or {}),
|
||||
},
|
||||
confidence=confidence,
|
||||
@@ -1154,8 +1156,8 @@ def _add_runtime_endpoint(
|
||||
confidence=confidence,
|
||||
)
|
||||
context.accumulator.add_edge(
|
||||
edge_type="opens_port",
|
||||
source_key=server_key,
|
||||
edge_type="opens_port" if target_kind == "Server" else "listens_on",
|
||||
source_key=target_key,
|
||||
target_key=port_key,
|
||||
replacement_scope=scope,
|
||||
provenance=provenance,
|
||||
@@ -1174,9 +1176,31 @@ def _add_runtime_endpoint(
|
||||
)
|
||||
route_domain = _normalize_domain(domain)
|
||||
if route_domain:
|
||||
_add_domain_route(context, scope, provenance, anchor, route_domain, port_key, host, confidence=confidence)
|
||||
_add_domain_route(
|
||||
context,
|
||||
scope,
|
||||
provenance,
|
||||
anchor,
|
||||
route_domain,
|
||||
port_key,
|
||||
host,
|
||||
runtime_target_key=target_key,
|
||||
runtime_target_kind=target_kind,
|
||||
confidence=confidence,
|
||||
)
|
||||
elif _looks_like_domain(host):
|
||||
_add_domain_route(context, scope, provenance, anchor, host, port_key, host, confidence=confidence)
|
||||
_add_domain_route(
|
||||
context,
|
||||
scope,
|
||||
provenance,
|
||||
anchor,
|
||||
host,
|
||||
port_key,
|
||||
host,
|
||||
runtime_target_key=target_key,
|
||||
runtime_target_kind=target_kind,
|
||||
confidence=confidence,
|
||||
)
|
||||
return port_key
|
||||
|
||||
|
||||
@@ -1225,13 +1249,14 @@ def _add_domain_route(
|
||||
port_key: str,
|
||||
server_host: str,
|
||||
*,
|
||||
runtime_target_key: str = "",
|
||||
runtime_target_kind: str = "",
|
||||
confidence: float,
|
||||
) -> None:
|
||||
domain_value = _normalize_domain(domain)
|
||||
if not domain_value:
|
||||
return
|
||||
domain_key = discovery_stable_key(context.repo_slug, "DomainName", domain_value)
|
||||
server_key = discovery_stable_key(context.repo_slug, "Server", _normalize_host(server_host))
|
||||
context.accumulator.add_node(
|
||||
stable_key=domain_key,
|
||||
kind="DomainName",
|
||||
@@ -1253,7 +1278,23 @@ def _add_domain_route(
|
||||
source_anchor=anchor,
|
||||
confidence=confidence,
|
||||
)
|
||||
if server_host:
|
||||
if runtime_target_key:
|
||||
edge_type = {
|
||||
"ApplicationEndpoint": "names_endpoint",
|
||||
"RuntimeService": "routes_to_service",
|
||||
"Server": "resolves_to",
|
||||
}.get(runtime_target_kind, "routes_to")
|
||||
context.accumulator.add_edge(
|
||||
edge_type=edge_type,
|
||||
source_key=domain_key,
|
||||
target_key=runtime_target_key,
|
||||
replacement_scope=scope,
|
||||
provenance=provenance,
|
||||
source_anchor=anchor,
|
||||
confidence=confidence,
|
||||
)
|
||||
elif server_host:
|
||||
server_key = discovery_stable_key(context.repo_slug, "Server", _normalize_host(server_host))
|
||||
context.accumulator.add_edge(
|
||||
edge_type="resolves_to",
|
||||
source_key=domain_key,
|
||||
@@ -1509,6 +1550,28 @@ def _looks_like_domain(host: str) -> bool:
|
||||
return "." in value
|
||||
|
||||
|
||||
def _runtime_target_kind(host: str, runtime_target_type: str) -> str:
|
||||
value = _normalize_host(host)
|
||||
if _looks_like_machine_address(value):
|
||||
return "Server"
|
||||
if runtime_target_type == "kubernetes-service-dns" or value.endswith(".svc.cluster.local"):
|
||||
return "RuntimeService"
|
||||
if runtime_target_type == "ingress-host" or _looks_like_domain(value):
|
||||
return "ApplicationEndpoint"
|
||||
return "RuntimeService"
|
||||
|
||||
|
||||
def _looks_like_machine_address(host: str) -> bool:
|
||||
value = _normalize_host(host)
|
||||
if value in {"localhost", "127.0.0.1"}:
|
||||
return True
|
||||
try:
|
||||
ipaddress.ip_address(value)
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _int_value(value: object) -> int | None:
|
||||
if isinstance(value, bool):
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user