Files

213 lines
8.7 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any
CANONICAL_NODE_CATEGORIES = (
"source-repository",
"software-system",
"service",
"endpoint",
"deployment",
"runtime-resource",
"datastore",
"flow",
"policy",
"control",
"evidence",
"task",
"consumer-purpose",
"telemetry-signal",
)
CANONICAL_EDGE_TYPES = (
"built_from",
"implements",
"exposes",
"depends_on",
"deploys",
"flows_to",
"governed_by",
"evidenced_by",
"observed_by",
"part_of",
"reads_or_writes",
"creates_task",
)
DISPLAY_ONLY_EDGE_TYPES = (
"collapsed_into",
"declares",
"grouped_with",
"highlight_path",
"near",
"owns_deployment",
"same_color_group",
)
EVIDENCE_STATES = ("observed", "declared", "inferred", "proposed", "gap")
MAPPING_FITS = ("direct", "partial", "conflict", "gap", "unknown")
@dataclass(frozen=True)
class CanonNodeMapping:
category: str
canon_anchor: str
fit: str
notes: str = ""
@dataclass(frozen=True)
class CanonEdgeMapping:
canonical_type: str
canon_anchor: str
fit: str
display_only: bool = False
notes: str = ""
UNKNOWN_NODE_MAPPING = CanonNodeMapping(
category="unknown",
canon_anchor="",
fit="gap",
notes="No canon mapping has been selected for this Fabric node kind yet.",
)
UNKNOWN_EDGE_MAPPING = CanonEdgeMapping(
canonical_type="",
canon_anchor="",
fit="gap",
notes="No canon mapping has been selected for this Fabric edge type yet.",
)
NODE_KIND_CANON_MAP: dict[str, CanonNodeMapping] = {
"ApplicationEndpoint": CanonNodeMapping("endpoint", "model/network", "direct"),
"BindingAssertion": CanonNodeMapping("evidence", "model/observability", "partial"),
"CapabilityDeclaration": CanonNodeMapping("software-system", "model/landscape", "partial"),
"ContainerBuild": CanonNodeMapping("deployment", "model/devsecops", "partial"),
"DependencyDeclaration": CanonNodeMapping("service", "model/landscape", "gap"),
"DeploymentService": CanonNodeMapping("deployment", "model/devsecops", "direct"),
"DomainName": CanonNodeMapping("endpoint", "model/network", "partial"),
"ExternalLibrary": CanonNodeMapping("software-system", "model/landscape", "partial"),
"FabricRegistryEntry": CanonNodeMapping("source-repository", "model/devsecops", "partial"),
"Fabric": CanonNodeMapping("control", "model/governance", "gap"),
"FabricActor": CanonNodeMapping("control", "model/governance", "gap"),
"InterfaceDeclaration": CanonNodeMapping("endpoint", "model/network", "partial"),
"Library": CanonNodeMapping("software-system", "model/landscape", "partial"),
"Lockfile": CanonNodeMapping("evidence", "model/observability", "partial"),
"Netkingdom": CanonNodeMapping("software-system", "model/landscape", "gap"),
"NetworkPort": CanonNodeMapping("endpoint", "model/network", "direct"),
"ProfitCenter": CanonNodeMapping("control", "model/governance", "gap"),
"Repository": CanonNodeMapping("source-repository", "model/devsecops", "direct"),
"RuntimeService": CanonNodeMapping("runtime-resource", "model/landscape", "direct"),
"ScoreWorkload": CanonNodeMapping("deployment", "model/devsecops", "direct"),
"Server": CanonNodeMapping("runtime-resource", "model/landscape", "partial"),
"ServiceConfig": CanonNodeMapping("evidence", "model/observability", "partial"),
"ServiceDeclaration": CanonNodeMapping("service", "model/landscape", "direct"),
"Subfabric": CanonNodeMapping("control", "model/governance", "gap"),
"UtilityInterface": CanonNodeMapping("endpoint", "model/network", "partial"),
"CostCenter": CanonNodeMapping("control", "model/governance", "gap"),
}
EDGE_TYPE_CANON_MAP: dict[str, CanonEdgeMapping] = {
"available_via": CanonEdgeMapping("exposes", "model/network", "partial"),
"attributed_to_cost_center": CanonEdgeMapping("governed_by", "model/governance", "gap"),
"attributed_to_profit_center": CanonEdgeMapping("governed_by", "model/governance", "gap"),
"binds": CanonEdgeMapping("depends_on", "model/landscape", "partial"),
"builds_container": CanonEdgeMapping("built_from", "model/devsecops", "partial"),
"cataloged_as": CanonEdgeMapping("evidenced_by", "model/observability", "partial"),
"consumes": CanonEdgeMapping("depends_on", "model/landscape", "partial"),
"contains": CanonEdgeMapping("part_of", "model/landscape", "partial"),
"declares": CanonEdgeMapping("part_of", "model/devsecops", "partial", display_only=True),
"declares_package": CanonEdgeMapping("built_from", "model/devsecops", "partial"),
"defines_deployment": CanonEdgeMapping("built_from", "model/devsecops", "partial"),
"defines_runtime_object": CanonEdgeMapping("deploys", "model/devsecops", "partial"),
"defines_workload": CanonEdgeMapping("deploys", "model/devsecops", "partial"),
"deployed_as": CanonEdgeMapping("deploys", "model/devsecops", "partial"),
"depends_on_library": CanonEdgeMapping("depends_on", "model/landscape", "partial"),
"documents_interface": CanonEdgeMapping("evidenced_by", "model/observability", "partial"),
"exposes": CanonEdgeMapping("exposes", "model/network", "direct"),
"exposes_port": CanonEdgeMapping("exposes", "model/network", "direct"),
"listens_on": CanonEdgeMapping("exposes", "model/network", "direct"),
"names_endpoint": CanonEdgeMapping("exposes", "model/network", "partial"),
"opens_port": CanonEdgeMapping("exposes", "model/network", "partial"),
"operated_by": CanonEdgeMapping("governed_by", "model/governance", "partial"),
"owned_by": CanonEdgeMapping("governed_by", "model/governance", "partial"),
"owns_deployment": CanonEdgeMapping("part_of", "model/devsecops", "partial", display_only=True),
"provides": CanonEdgeMapping("implements", "model/landscape", "partial"),
"provides_utility_to": CanonEdgeMapping("depends_on", "model/landscape", "partial"),
"resolves_to": CanonEdgeMapping("flows_to", "model/network", "partial"),
"routes_to_port": CanonEdgeMapping("flows_to", "model/network", "partial"),
"routes_to_service": CanonEdgeMapping("flows_to", "model/network", "partial"),
"runs_on": CanonEdgeMapping("deploys", "model/devsecops", "partial"),
"suggests_capability": CanonEdgeMapping("creates_task", "model/task", "partial"),
"uses_config": CanonEdgeMapping("evidenced_by", "model/observability", "partial"),
"uses_interface": CanonEdgeMapping("depends_on", "model/landscape", "partial"),
"uses_lockfile": CanonEdgeMapping("evidenced_by", "model/observability", "partial"),
}
def node_canon_mapping(kind: str) -> CanonNodeMapping:
if kind in NODE_KIND_CANON_MAP:
return NODE_KIND_CANON_MAP[kind]
if kind.startswith("Kubernetes"):
return CanonNodeMapping("runtime-resource", "model/landscape", "direct")
return UNKNOWN_NODE_MAPPING
def edge_canon_mapping(edge_type: str) -> CanonEdgeMapping:
normalized = str(edge_type or "").strip()
if normalized.startswith("binds:"):
return EDGE_TYPE_CANON_MAP["binds"]
if normalized in EDGE_TYPE_CANON_MAP:
return EDGE_TYPE_CANON_MAP[normalized]
if normalized in CANONICAL_EDGE_TYPES:
return CanonEdgeMapping(normalized, _anchor_for_canonical_edge(normalized), "direct")
if normalized in DISPLAY_ONLY_EDGE_TYPES:
return CanonEdgeMapping("", "", "gap", display_only=True)
return UNKNOWN_EDGE_MAPPING
def evidence_state_for(
*,
origin: str = "",
source_kind: str = "",
review_state: str = "",
confidence: float | None = None,
) -> str:
if review_state == "rejected":
return "gap"
if origin == "llm":
return "proposed"
if confidence is not None and confidence < 0.5:
return "inferred"
if source_kind in {"package_registry", "container_registry", "service_catalog", "fabric_registry"}:
return "observed"
if source_kind in {"llm"}:
return "proposed"
if not source_kind and origin == "deterministic":
return "inferred"
return "declared"
def source_kind_from_anchor(source_anchor: dict[str, Any]) -> str:
return str(source_anchor.get("source_kind") or "")
def _anchor_for_canonical_edge(edge_type: str) -> str:
return {
"built_from": "model/devsecops",
"implements": "model/security",
"exposes": "model/network",
"depends_on": "model/landscape",
"deploys": "model/devsecops",
"flows_to": "model/network",
"governed_by": "model/governance",
"evidenced_by": "model/observability",
"observed_by": "model/observability",
"part_of": "model/landscape",
"reads_or_writes": "model/data",
"creates_task": "model/task",
}.get(edge_type, "")