generated from coulomb/repo-seed
200 lines
7.6 KiB
Python
200 lines
7.6 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
|
|
CANONICAL_NODE_CATEGORIES = (
|
|
"source-repository",
|
|
"software-system",
|
|
"service",
|
|
"endpoint",
|
|
"deployment",
|
|
"runtime-resource",
|
|
"datastore",
|
|
"flow",
|
|
"policy",
|
|
"control",
|
|
"evidence",
|
|
"task",
|
|
"consumer-purpose",
|
|
"telemetry-signal",
|
|
)
|
|
|
|
CANONICAL_EDGE_TYPES = (
|
|
"built_from",
|
|
"implements",
|
|
"exposes",
|
|
"depends_on",
|
|
"deploys",
|
|
"flows_to",
|
|
"governed_by",
|
|
"evidenced_by",
|
|
"observed_by",
|
|
"part_of",
|
|
"reads_or_writes",
|
|
"creates_task",
|
|
)
|
|
|
|
DISPLAY_ONLY_EDGE_TYPES = (
|
|
"collapsed_into",
|
|
"declares",
|
|
"grouped_with",
|
|
"highlight_path",
|
|
"near",
|
|
"owns_deployment",
|
|
"same_color_group",
|
|
)
|
|
|
|
EVIDENCE_STATES = ("observed", "declared", "inferred", "proposed", "gap")
|
|
MAPPING_FITS = ("direct", "partial", "conflict", "gap", "unknown")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CanonNodeMapping:
|
|
category: str
|
|
canon_anchor: str
|
|
fit: str
|
|
notes: str = ""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CanonEdgeMapping:
|
|
canonical_type: str
|
|
canon_anchor: str
|
|
fit: str
|
|
display_only: bool = False
|
|
notes: str = ""
|
|
|
|
|
|
UNKNOWN_NODE_MAPPING = CanonNodeMapping(
|
|
category="unknown",
|
|
canon_anchor="",
|
|
fit="gap",
|
|
notes="No canon mapping has been selected for this Fabric node kind yet.",
|
|
)
|
|
|
|
UNKNOWN_EDGE_MAPPING = CanonEdgeMapping(
|
|
canonical_type="",
|
|
canon_anchor="",
|
|
fit="gap",
|
|
notes="No canon mapping has been selected for this Fabric edge type yet.",
|
|
)
|
|
|
|
NODE_KIND_CANON_MAP: dict[str, CanonNodeMapping] = {
|
|
"ApplicationEndpoint": CanonNodeMapping("endpoint", "model/network", "direct"),
|
|
"BindingAssertion": CanonNodeMapping("evidence", "model/observability", "partial"),
|
|
"CapabilityDeclaration": CanonNodeMapping("software-system", "model/landscape", "partial"),
|
|
"ContainerBuild": CanonNodeMapping("deployment", "model/devsecops", "partial"),
|
|
"DependencyDeclaration": CanonNodeMapping("service", "model/landscape", "gap"),
|
|
"DeploymentService": CanonNodeMapping("deployment", "model/devsecops", "direct"),
|
|
"DomainName": CanonNodeMapping("endpoint", "model/network", "partial"),
|
|
"ExternalLibrary": CanonNodeMapping("software-system", "model/landscape", "partial"),
|
|
"FabricRegistryEntry": CanonNodeMapping("source-repository", "model/devsecops", "partial"),
|
|
"InterfaceDeclaration": CanonNodeMapping("endpoint", "model/network", "partial"),
|
|
"Library": CanonNodeMapping("software-system", "model/landscape", "partial"),
|
|
"Lockfile": CanonNodeMapping("evidence", "model/observability", "partial"),
|
|
"NetworkPort": CanonNodeMapping("endpoint", "model/network", "direct"),
|
|
"Repository": CanonNodeMapping("source-repository", "model/devsecops", "direct"),
|
|
"RuntimeService": CanonNodeMapping("runtime-resource", "model/landscape", "direct"),
|
|
"ScoreWorkload": CanonNodeMapping("deployment", "model/devsecops", "direct"),
|
|
"Server": CanonNodeMapping("runtime-resource", "model/landscape", "partial"),
|
|
"ServiceConfig": CanonNodeMapping("evidence", "model/observability", "partial"),
|
|
"ServiceDeclaration": CanonNodeMapping("service", "model/landscape", "direct"),
|
|
}
|
|
|
|
EDGE_TYPE_CANON_MAP: dict[str, CanonEdgeMapping] = {
|
|
"available_via": CanonEdgeMapping("exposes", "model/network", "partial"),
|
|
"binds": CanonEdgeMapping("depends_on", "model/landscape", "partial"),
|
|
"builds_container": CanonEdgeMapping("built_from", "model/devsecops", "partial"),
|
|
"cataloged_as": CanonEdgeMapping("evidenced_by", "model/observability", "partial"),
|
|
"consumes": CanonEdgeMapping("depends_on", "model/landscape", "partial"),
|
|
"declares": CanonEdgeMapping("part_of", "model/devsecops", "partial", display_only=True),
|
|
"declares_package": CanonEdgeMapping("built_from", "model/devsecops", "partial"),
|
|
"defines_deployment": CanonEdgeMapping("built_from", "model/devsecops", "partial"),
|
|
"defines_runtime_object": CanonEdgeMapping("deploys", "model/devsecops", "partial"),
|
|
"defines_workload": CanonEdgeMapping("deploys", "model/devsecops", "partial"),
|
|
"deployed_as": CanonEdgeMapping("deploys", "model/devsecops", "partial"),
|
|
"depends_on_library": CanonEdgeMapping("depends_on", "model/landscape", "partial"),
|
|
"documents_interface": CanonEdgeMapping("evidenced_by", "model/observability", "partial"),
|
|
"exposes": CanonEdgeMapping("exposes", "model/network", "direct"),
|
|
"exposes_port": CanonEdgeMapping("exposes", "model/network", "direct"),
|
|
"listens_on": CanonEdgeMapping("exposes", "model/network", "direct"),
|
|
"names_endpoint": CanonEdgeMapping("exposes", "model/network", "partial"),
|
|
"opens_port": CanonEdgeMapping("exposes", "model/network", "partial"),
|
|
"owns_deployment": CanonEdgeMapping("part_of", "model/devsecops", "partial", display_only=True),
|
|
"provides": CanonEdgeMapping("implements", "model/landscape", "partial"),
|
|
"resolves_to": CanonEdgeMapping("flows_to", "model/network", "partial"),
|
|
"routes_to_port": CanonEdgeMapping("flows_to", "model/network", "partial"),
|
|
"routes_to_service": CanonEdgeMapping("flows_to", "model/network", "partial"),
|
|
"runs_on": CanonEdgeMapping("deploys", "model/devsecops", "partial"),
|
|
"suggests_capability": CanonEdgeMapping("creates_task", "model/task", "partial"),
|
|
"uses_config": CanonEdgeMapping("evidenced_by", "model/observability", "partial"),
|
|
"uses_interface": CanonEdgeMapping("depends_on", "model/landscape", "partial"),
|
|
"uses_lockfile": CanonEdgeMapping("evidenced_by", "model/observability", "partial"),
|
|
}
|
|
|
|
|
|
def node_canon_mapping(kind: str) -> CanonNodeMapping:
|
|
if kind in NODE_KIND_CANON_MAP:
|
|
return NODE_KIND_CANON_MAP[kind]
|
|
if kind.startswith("Kubernetes"):
|
|
return CanonNodeMapping("runtime-resource", "model/landscape", "direct")
|
|
return UNKNOWN_NODE_MAPPING
|
|
|
|
|
|
def edge_canon_mapping(edge_type: str) -> CanonEdgeMapping:
|
|
normalized = str(edge_type or "").strip()
|
|
if normalized.startswith("binds:"):
|
|
return EDGE_TYPE_CANON_MAP["binds"]
|
|
if normalized in EDGE_TYPE_CANON_MAP:
|
|
return EDGE_TYPE_CANON_MAP[normalized]
|
|
if normalized in CANONICAL_EDGE_TYPES:
|
|
return CanonEdgeMapping(normalized, _anchor_for_canonical_edge(normalized), "direct")
|
|
if normalized in DISPLAY_ONLY_EDGE_TYPES:
|
|
return CanonEdgeMapping("", "", "gap", display_only=True)
|
|
return UNKNOWN_EDGE_MAPPING
|
|
|
|
|
|
def evidence_state_for(
|
|
*,
|
|
origin: str = "",
|
|
source_kind: str = "",
|
|
review_state: str = "",
|
|
confidence: float | None = None,
|
|
) -> str:
|
|
if review_state == "rejected":
|
|
return "gap"
|
|
if origin == "llm":
|
|
return "proposed"
|
|
if confidence is not None and confidence < 0.5:
|
|
return "inferred"
|
|
if source_kind in {"package_registry", "container_registry", "service_catalog", "fabric_registry"}:
|
|
return "observed"
|
|
if source_kind in {"llm"}:
|
|
return "proposed"
|
|
if not source_kind and origin == "deterministic":
|
|
return "inferred"
|
|
return "declared"
|
|
|
|
|
|
def source_kind_from_anchor(source_anchor: dict[str, Any]) -> str:
|
|
return str(source_anchor.get("source_kind") or "")
|
|
|
|
|
|
def _anchor_for_canonical_edge(edge_type: str) -> str:
|
|
return {
|
|
"built_from": "model/devsecops",
|
|
"implements": "model/security",
|
|
"exposes": "model/network",
|
|
"depends_on": "model/landscape",
|
|
"deploys": "model/devsecops",
|
|
"flows_to": "model/network",
|
|
"governed_by": "model/governance",
|
|
"evidenced_by": "model/observability",
|
|
"observed_by": "model/observability",
|
|
"part_of": "model/landscape",
|
|
"reads_or_writes": "model/data",
|
|
"creates_task": "model/task",
|
|
}.get(edge_type, "")
|