generated from coulomb/repo-seed
108 lines
3.6 KiB
Python
108 lines
3.6 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
|
|
OVERLAY_FIELDS = (
|
|
"deployment_environment",
|
|
"deployment_scenario",
|
|
"routing_authority",
|
|
"access_zone",
|
|
"policy_authority",
|
|
"exposure_class",
|
|
)
|
|
|
|
ROUTE_EVIDENCE_FIELDS = (
|
|
"host",
|
|
"hostname",
|
|
"port",
|
|
"protocol",
|
|
"route",
|
|
"route_url",
|
|
"path",
|
|
"scheme",
|
|
)
|
|
|
|
_ALIASES = {
|
|
"deployment_environment": ("deployment_environment", "environment"),
|
|
"deployment_scenario": ("deployment_scenario", "deployment_scenario_id", "scenario"),
|
|
"routing_authority": ("routing_authority",),
|
|
"access_zone": ("access_zone",),
|
|
"policy_authority": ("policy_authority",),
|
|
"exposure_class": ("exposure_class", "exposure"),
|
|
}
|
|
|
|
|
|
def normalize_deployment_overlay(*sources: object) -> dict[str, Any]:
|
|
"""Collect deployment-zone overlay fields without changing fabric membership."""
|
|
|
|
overlay: dict[str, Any] = {}
|
|
route_evidence: dict[str, Any] = {}
|
|
|
|
for source in sources:
|
|
if not isinstance(source, dict):
|
|
continue
|
|
nested = source.get("deployment_overlay")
|
|
if isinstance(nested, dict):
|
|
_merge_overlay_source(overlay, route_evidence, nested)
|
|
_merge_overlay_source(overlay, route_evidence, source)
|
|
|
|
if route_evidence:
|
|
overlay["route_evidence"] = route_evidence
|
|
return {key: value for key, value in overlay.items() if _has_value(value)}
|
|
|
|
|
|
def graph_explorer_overlay_fields(overlay: dict[str, Any]) -> dict[str, Any]:
|
|
route = overlay.get("route_evidence") if isinstance(overlay.get("route_evidence"), dict) else {}
|
|
return {
|
|
"deploymentOverlay": overlay,
|
|
"deploymentEnvironment": str(overlay.get("deployment_environment") or ""),
|
|
"deploymentScenario": str(overlay.get("deployment_scenario") or ""),
|
|
"routingAuthority": str(overlay.get("routing_authority") or ""),
|
|
"accessZone": str(overlay.get("access_zone") or ""),
|
|
"policyAuthority": str(overlay.get("policy_authority") or ""),
|
|
"exposureClass": str(overlay.get("exposure_class") or ""),
|
|
"routeHost": str(route.get("host") or ""),
|
|
"routeHostname": str(route.get("hostname") or ""),
|
|
"routePort": route.get("port") if route.get("port") not in (None, "") else "",
|
|
"routeProtocol": str(route.get("protocol") or ""),
|
|
"route": str(route.get("route") or route.get("route_url") or ""),
|
|
}
|
|
|
|
|
|
def _merge_overlay_source(overlay: dict[str, Any], route_evidence: dict[str, Any], source: dict[str, Any]) -> None:
|
|
for field in OVERLAY_FIELDS:
|
|
if field in overlay:
|
|
continue
|
|
value = _first_value(source, _ALIASES[field])
|
|
if _has_value(value):
|
|
overlay[field] = value
|
|
|
|
nested_route = source.get("route_evidence")
|
|
if isinstance(nested_route, dict):
|
|
for field in ROUTE_EVIDENCE_FIELDS:
|
|
value = nested_route.get(field)
|
|
if _has_value(value) and field not in route_evidence:
|
|
route_evidence[field] = value
|
|
|
|
for field in ROUTE_EVIDENCE_FIELDS:
|
|
value = source.get(field)
|
|
if _has_value(value) and field not in route_evidence:
|
|
route_evidence[field] = value
|
|
|
|
|
|
def _first_value(source: dict[str, Any], keys: tuple[str, ...]) -> Any:
|
|
for key in keys:
|
|
value = source.get(key)
|
|
if _has_value(value):
|
|
return value
|
|
return ""
|
|
|
|
|
|
def _has_value(value: Any) -> bool:
|
|
if isinstance(value, dict):
|
|
return any(_has_value(item) for item in value.values())
|
|
if isinstance(value, list):
|
|
return any(_has_value(item) for item in value)
|
|
return value not in (None, "")
|