generated from coulomb/repo-seed
430 lines
20 KiB
Python
430 lines
20 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any
|
|
|
|
from .deployment_overlay import normalize_deployment_overlay
|
|
|
|
FINANCIAL_API_VERSION = "railiance.fabric/v1alpha2"
|
|
FINANCIAL_SCHEMA_VERSION = "financial-fabric-v1"
|
|
|
|
ACTOR_ROLES = {"king", "lord", "tenant", "operator", "steward"}
|
|
OWNERSHIP_RESOLUTIONS = {"explicit", "inherited", "unresolved", "ambiguous"}
|
|
EVIDENCE_STATES = {"observed", "declared", "inferred", "proposed", "gap"}
|
|
REVIEW_STATES = {"accepted", "candidate", "needs_review", "rejected"}
|
|
RELATIONSHIP_CATEGORIES = {
|
|
"containment",
|
|
"ownership",
|
|
"technical",
|
|
"utility",
|
|
"accounting",
|
|
"evidence",
|
|
}
|
|
FABRIC_KINDS = {"Fabric", "Subfabric"}
|
|
ENVIRONMENT_LABELS = {"local", "dev", "test", "staging", "prod", "lab", "all"}
|
|
|
|
|
|
def is_financial_graph_export(graph: dict[str, Any]) -> bool:
|
|
return (
|
|
graph.get("apiVersion") == FINANCIAL_API_VERSION
|
|
or graph.get("schema_version") == FINANCIAL_SCHEMA_VERSION
|
|
)
|
|
|
|
|
|
def materialize_financial_graph_export(graph: dict[str, Any]) -> dict[str, Any]:
|
|
result = json.loads(json.dumps(graph))
|
|
if not is_financial_graph_export(result):
|
|
return result
|
|
|
|
result["apiVersion"] = FINANCIAL_API_VERSION
|
|
result.setdefault("kind", "FabricGraphExport")
|
|
result.setdefault("schema_version", FINANCIAL_SCHEMA_VERSION)
|
|
result.setdefault("compatibility", {})
|
|
result.setdefault("actors", [])
|
|
result.setdefault("fabrics", [])
|
|
result.setdefault("nodes", [])
|
|
result.setdefault("edges", [])
|
|
result.setdefault("unresolved", [])
|
|
|
|
for node in _items(result, "nodes"):
|
|
_materialize_node(node)
|
|
for edge in _items(result, "edges"):
|
|
_materialize_edge(edge)
|
|
return result
|
|
|
|
|
|
def financial_graph_errors(graph: dict[str, Any]) -> list[str]:
|
|
if not is_financial_graph_export(graph):
|
|
return []
|
|
|
|
errors: list[str] = []
|
|
if graph.get("apiVersion") != FINANCIAL_API_VERSION:
|
|
errors.append(f"apiVersion must be {FINANCIAL_API_VERSION!r}")
|
|
if graph.get("kind") != "FabricGraphExport":
|
|
errors.append("kind must be 'FabricGraphExport'")
|
|
if graph.get("schema_version") != FINANCIAL_SCHEMA_VERSION:
|
|
errors.append(f"schema_version must be {FINANCIAL_SCHEMA_VERSION!r}")
|
|
|
|
netkingdom = graph.get("netkingdom")
|
|
if not isinstance(netkingdom, dict):
|
|
errors.append("netkingdom must be an object")
|
|
netkingdom = {}
|
|
netkingdom_id = _required_text(errors, "netkingdom.id", netkingdom, "id")
|
|
king_actor_id = _required_text(errors, "netkingdom.king_actor_id", netkingdom, "king_actor_id")
|
|
|
|
actors = _indexed_items(errors, graph, "actors")
|
|
actor_roles: dict[str, str] = {}
|
|
for index, actor in actors:
|
|
actor_id = _required_text(errors, f"actors[{index}].id", actor, "id")
|
|
role = _required_text(errors, f"actors[{index}].role", actor, "role")
|
|
if role and role not in ACTOR_ROLES:
|
|
errors.append(f"actors[{index}].role {role!r} is not valid")
|
|
if actor_id:
|
|
if actor_id in actor_roles:
|
|
errors.append(f"actors[{index}].id {actor_id!r} is duplicated")
|
|
actor_roles[actor_id] = role
|
|
if king_actor_id and actor_roles.get(king_actor_id) != "king":
|
|
errors.append("netkingdom.king_actor_id must reference an actor with role 'king'")
|
|
|
|
fabrics = _indexed_items(errors, graph, "fabrics")
|
|
fabric_kinds: dict[str, str] = {}
|
|
for index, fabric in fabrics:
|
|
fabric_id = _required_text(errors, f"fabrics[{index}].id", fabric, "id")
|
|
kind = _required_text(errors, f"fabrics[{index}].kind", fabric, "kind")
|
|
if kind and kind not in FABRIC_KINDS:
|
|
errors.append(f"fabrics[{index}].kind {kind!r} is not valid")
|
|
if fabric_id:
|
|
if fabric_id in ENVIRONMENT_LABELS:
|
|
errors.append(f"fabrics[{index}].id must not be an environment label")
|
|
if fabric_id in fabric_kinds:
|
|
errors.append(f"fabrics[{index}].id {fabric_id!r} is duplicated")
|
|
fabric_kinds[fabric_id] = kind
|
|
if _text(fabric.get("netkingdom_id")) != netkingdom_id:
|
|
errors.append(f"fabrics[{index}].netkingdom_id must match netkingdom.id")
|
|
if kind == "Fabric":
|
|
lord_actor_id = _required_text(errors, f"fabrics[{index}].lord_actor_id", fabric, "lord_actor_id")
|
|
if lord_actor_id and actor_roles.get(lord_actor_id) not in {"lord", "king"}:
|
|
errors.append(f"fabrics[{index}].lord_actor_id must reference a lord or king actor")
|
|
if kind == "Subfabric":
|
|
parent_id = _required_text(errors, f"fabrics[{index}].parent_fabric_id", fabric, "parent_fabric_id")
|
|
tenant_actor_id = _required_text(errors, f"fabrics[{index}].tenant_actor_id", fabric, "tenant_actor_id")
|
|
if parent_id and parent_id not in fabric_kinds:
|
|
errors.append(f"fabrics[{index}].parent_fabric_id references unknown fabric {parent_id!r}")
|
|
if tenant_actor_id and actor_roles.get(tenant_actor_id) != "tenant":
|
|
errors.append(f"fabrics[{index}].tenant_actor_id must reference a tenant actor")
|
|
|
|
node_ids = set()
|
|
nodes = _indexed_items(errors, graph, "nodes")
|
|
for index, node in nodes:
|
|
node_id = _required_text(errors, f"nodes[{index}].id", node, "id")
|
|
_required_text(errors, f"nodes[{index}].kind", node, "kind")
|
|
_required_text(errors, f"nodes[{index}].name", node, "name")
|
|
if node_id:
|
|
if node_id in node_ids:
|
|
errors.append(f"nodes[{index}].id {node_id!r} is duplicated")
|
|
node_ids.add(node_id)
|
|
review_state = _validate_evidence(errors, f"nodes[{index}]", node)
|
|
_validate_containment(errors, f"nodes[{index}]", node, netkingdom_id, fabric_kinds, accepted=review_state == "accepted")
|
|
_validate_ownership(errors, f"nodes[{index}]", node, actor_roles, accepted=review_state == "accepted")
|
|
_validate_optional_object(errors, f"nodes[{index}].accounting", node, "accounting")
|
|
_validate_overlay(errors, f"nodes[{index}].deployment_overlay", node)
|
|
|
|
edges = _indexed_items(errors, graph, "edges")
|
|
for index, edge in edges:
|
|
path = f"edges[{index}]"
|
|
source = _required_text(errors, f"{path}.from", edge, "from")
|
|
target = _required_text(errors, f"{path}.to", edge, "to")
|
|
edge_type = _required_text(errors, f"{path}.type", edge, "type")
|
|
if source and source not in node_ids:
|
|
errors.append(f"{path}.from references unknown node {source!r}")
|
|
if target and target not in node_ids:
|
|
errors.append(f"{path}.to references unknown node {target!r}")
|
|
category = _required_text(errors, f"{path}.relationship_category", edge, "relationship_category")
|
|
if category and category not in RELATIONSHIP_CATEGORIES:
|
|
errors.append(f"{path}.relationship_category {category!r} is not valid")
|
|
_validate_evidence(errors, path, edge)
|
|
_validate_optional_object(errors, f"{path}.accounting", edge, "accounting")
|
|
_validate_overlay(errors, f"{path}.deployment_overlay", edge)
|
|
if edge_type == "provides_utility_to" and category != "utility":
|
|
errors.append(f"{path}.relationship_category must be 'utility' for provides_utility_to edges")
|
|
if category == "utility":
|
|
_validate_utility_edge(errors, path, edge, actor_roles, fabric_kinds)
|
|
|
|
for index, item in _indexed_items(errors, graph, "unresolved"):
|
|
_required_text(errors, f"unresolved[{index}].target_id", item, "target_id")
|
|
_required_text(errors, f"unresolved[{index}].kind", item, "kind")
|
|
_required_text(errors, f"unresolved[{index}].message", item, "message")
|
|
|
|
return errors
|
|
|
|
|
|
def merge_financial_graph_exports(graphs: list[dict[str, Any]], *, generated_at: str) -> dict[str, Any]:
|
|
merged: dict[str, Any] = {
|
|
"apiVersion": FINANCIAL_API_VERSION,
|
|
"kind": "FabricGraphExport",
|
|
"schema_version": FINANCIAL_SCHEMA_VERSION,
|
|
"generated_at": generated_at,
|
|
"source": {"producer": "railiance-fabric", "registry": "registry", "commit": ""},
|
|
"compatibility": {"legacy_v1alpha1_supported": True, "breaking_reset": False},
|
|
"actors": [],
|
|
"fabrics": [],
|
|
"nodes": [],
|
|
"edges": [],
|
|
"unresolved": [],
|
|
}
|
|
actors: dict[str, dict[str, Any]] = {}
|
|
fabrics: dict[str, dict[str, Any]] = {}
|
|
nodes: dict[str, dict[str, Any]] = {}
|
|
edges: dict[str, dict[str, Any]] = {}
|
|
unresolved: dict[str, dict[str, Any]] = {}
|
|
|
|
for graph in graphs:
|
|
graph = materialize_financial_graph_export(graph)
|
|
if "netkingdom" in graph and "netkingdom" not in merged:
|
|
merged["netkingdom"] = graph["netkingdom"]
|
|
for actor in _items(graph, "actors"):
|
|
actor_id = str(actor.get("id") or "")
|
|
if actor_id:
|
|
actors[actor_id] = actor
|
|
for fabric in _items(graph, "fabrics"):
|
|
fabric_id = str(fabric.get("id") or "")
|
|
if fabric_id:
|
|
fabrics[fabric_id] = fabric
|
|
for node in _items(graph, "nodes"):
|
|
node_id = str(node.get("id") or "")
|
|
if node_id:
|
|
nodes[node_id] = node
|
|
for edge in _items(graph, "edges"):
|
|
edges[_edge_identity(edge)] = edge
|
|
for item in _items(graph, "unresolved"):
|
|
target = str(item.get("target_id") or "")
|
|
kind = str(item.get("kind") or "")
|
|
if target or kind:
|
|
unresolved[f"{target}:{kind}"] = item
|
|
|
|
merged["actors"] = [actors[key] for key in sorted(actors)]
|
|
merged["fabrics"] = [fabrics[key] for key in sorted(fabrics)]
|
|
merged["nodes"] = [nodes[key] for key in sorted(nodes)]
|
|
merged["edges"] = [edges[key] for key in sorted(edges)]
|
|
merged["unresolved"] = [unresolved[key] for key in sorted(unresolved)]
|
|
return materialize_financial_graph_export(merged)
|
|
|
|
|
|
def _materialize_node(node: dict[str, Any]) -> None:
|
|
attrs = node.get("attributes") if isinstance(node.get("attributes"), dict) else {}
|
|
for key in ("containment", "ownership", "accounting"):
|
|
if key not in node and isinstance(attrs.get(key), dict):
|
|
node[key] = attrs[key]
|
|
overlay = normalize_deployment_overlay(
|
|
node.get("deployment_overlay") if isinstance(node.get("deployment_overlay"), dict) else {},
|
|
attrs,
|
|
node.get("containment") if isinstance(node.get("containment"), dict) else {},
|
|
)
|
|
if overlay:
|
|
node["deployment_overlay"] = overlay
|
|
node.setdefault("evidence", _legacy_evidence(node, attrs))
|
|
|
|
|
|
def _materialize_edge(edge: dict[str, Any]) -> None:
|
|
attrs = edge.get("attributes") if isinstance(edge.get("attributes"), dict) else {}
|
|
if "accounting" not in edge and isinstance(attrs.get("accounting"), dict):
|
|
edge["accounting"] = attrs["accounting"]
|
|
overlay = normalize_deployment_overlay(
|
|
edge.get("deployment_overlay") if isinstance(edge.get("deployment_overlay"), dict) else {},
|
|
attrs,
|
|
)
|
|
if overlay:
|
|
edge["deployment_overlay"] = overlay
|
|
edge.setdefault("relationship_category", _relationship_category(edge))
|
|
edge.setdefault("evidence", _legacy_evidence(edge, attrs))
|
|
if edge.get("relationship_category") == "utility":
|
|
provider = edge.get("provider") if isinstance(edge.get("provider"), dict) else {}
|
|
consumer = edge.get("consumer") if isinstance(edge.get("consumer"), dict) else {}
|
|
boundary = edge.setdefault("boundary", {})
|
|
if isinstance(boundary, dict) and provider and consumer:
|
|
boundary.setdefault("crosses_fabric_boundary", provider.get("fabric_id") != consumer.get("fabric_id"))
|
|
boundary.setdefault("crosses_subfabric_boundary", provider.get("subfabric_id") != consumer.get("subfabric_id"))
|
|
|
|
|
|
def _legacy_evidence(item: dict[str, Any], attrs: dict[str, Any]) -> dict[str, Any]:
|
|
confidence = attrs.get("discovery_confidence")
|
|
evidence: dict[str, Any] = {
|
|
"state": item.get("evidence_state") or "declared",
|
|
"review_state": attrs.get("discovery_review_state") or "accepted",
|
|
"refs": attrs.get("discovery_source_anchors") if isinstance(attrs.get("discovery_source_anchors"), list) else [],
|
|
}
|
|
if isinstance(confidence, (int, float)):
|
|
evidence["confidence"] = float(confidence)
|
|
return evidence
|
|
|
|
|
|
def _relationship_category(edge: dict[str, Any]) -> str:
|
|
edge_type = str(edge.get("type") or "")
|
|
if edge_type in {"contains", "part_of"}:
|
|
return "containment"
|
|
if edge_type in {"owned_by", "operated_by"}:
|
|
return "ownership"
|
|
if edge_type == "provides_utility_to":
|
|
return "utility"
|
|
if edge_type in {"attributed_to_cost_center", "attributed_to_profit_center"}:
|
|
return "accounting"
|
|
if edge_type == "evidenced_by":
|
|
return "evidence"
|
|
return "technical"
|
|
|
|
|
|
def _validate_evidence(errors: list[str], path: str, item: dict[str, Any]) -> str:
|
|
evidence = item.get("evidence")
|
|
if not isinstance(evidence, dict):
|
|
errors.append(f"{path}.evidence must be an object")
|
|
return ""
|
|
state = _required_text(errors, f"{path}.evidence.state", evidence, "state")
|
|
review_state = _required_text(errors, f"{path}.evidence.review_state", evidence, "review_state")
|
|
if state and state not in EVIDENCE_STATES:
|
|
errors.append(f"{path}.evidence.state {state!r} is not valid")
|
|
if review_state and review_state not in REVIEW_STATES:
|
|
errors.append(f"{path}.evidence.review_state {review_state!r} is not valid")
|
|
return review_state
|
|
|
|
|
|
def _validate_containment(
|
|
errors: list[str],
|
|
path: str,
|
|
node: dict[str, Any],
|
|
netkingdom_id: str,
|
|
fabric_kinds: dict[str, str],
|
|
*,
|
|
accepted: bool,
|
|
) -> None:
|
|
containment = node.get("containment")
|
|
if not isinstance(containment, dict):
|
|
if accepted:
|
|
errors.append(f"{path}.containment must be an object for accepted nodes")
|
|
return
|
|
if accepted:
|
|
if _text(containment.get("netkingdom_id")) != netkingdom_id:
|
|
errors.append(f"{path}.containment.netkingdom_id must match netkingdom.id")
|
|
fabric_id = _required_text(errors, f"{path}.containment.fabric_id", containment, "fabric_id")
|
|
if fabric_id:
|
|
if fabric_id in ENVIRONMENT_LABELS:
|
|
errors.append(f"{path}.containment.fabric_id must not be an environment label")
|
|
if fabric_id not in fabric_kinds:
|
|
errors.append(f"{path}.containment.fabric_id references unknown fabric {fabric_id!r}")
|
|
subfabric_id = _text(containment.get("subfabric_id"))
|
|
if subfabric_id:
|
|
if fabric_kinds.get(subfabric_id) != "Subfabric":
|
|
errors.append(f"{path}.containment.subfabric_id references unknown subfabric {subfabric_id!r}")
|
|
environment = _text(containment.get("environment"))
|
|
if environment and environment in fabric_kinds:
|
|
errors.append(f"{path}.containment.environment must not reference a fabric id")
|
|
|
|
|
|
def _validate_ownership(
|
|
errors: list[str],
|
|
path: str,
|
|
node: dict[str, Any],
|
|
actor_roles: dict[str, str],
|
|
*,
|
|
accepted: bool,
|
|
) -> None:
|
|
ownership = node.get("ownership")
|
|
if not isinstance(ownership, dict):
|
|
if accepted:
|
|
errors.append(f"{path}.ownership must be an object for accepted nodes")
|
|
return
|
|
owner_actor_id = _required_text(errors, f"{path}.ownership.owner_actor_id", ownership, "owner_actor_id")
|
|
owner_role = _required_text(errors, f"{path}.ownership.owner_role", ownership, "owner_role")
|
|
resolution = _required_text(errors, f"{path}.ownership.resolution", ownership, "resolution")
|
|
if owner_role and owner_role not in ACTOR_ROLES:
|
|
errors.append(f"{path}.ownership.owner_role {owner_role!r} is not valid")
|
|
if resolution and resolution not in OWNERSHIP_RESOLUTIONS:
|
|
errors.append(f"{path}.ownership.resolution {resolution!r} is not valid")
|
|
if owner_actor_id and owner_actor_id not in actor_roles:
|
|
errors.append(f"{path}.ownership.owner_actor_id references unknown actor {owner_actor_id!r}")
|
|
if owner_actor_id and owner_role and actor_roles.get(owner_actor_id) not in {owner_role, ""}:
|
|
errors.append(f"{path}.ownership.owner_role does not match referenced actor role")
|
|
if accepted and resolution in {"unresolved", "ambiguous"}:
|
|
errors.append(f"{path}.ownership.resolution must be explicit or inherited for accepted nodes")
|
|
|
|
|
|
def _validate_utility_edge(
|
|
errors: list[str],
|
|
path: str,
|
|
edge: dict[str, Any],
|
|
actor_roles: dict[str, str],
|
|
fabric_kinds: dict[str, str],
|
|
) -> None:
|
|
provider = _required_object(errors, f"{path}.provider", edge, "provider")
|
|
consumer = _required_object(errors, f"{path}.consumer", edge, "consumer")
|
|
boundary = _required_object(errors, f"{path}.boundary", edge, "boundary")
|
|
utility = _required_object(errors, f"{path}.utility", edge, "utility")
|
|
for side_name, side in (("provider", provider), ("consumer", consumer)):
|
|
actor_id = _required_text(errors, f"{path}.{side_name}.owner_actor_id", side, "owner_actor_id")
|
|
fabric_id = _required_text(errors, f"{path}.{side_name}.fabric_id", side, "fabric_id")
|
|
if actor_id and actor_id not in actor_roles:
|
|
errors.append(f"{path}.{side_name}.owner_actor_id references unknown actor {actor_id!r}")
|
|
if fabric_id and fabric_id not in fabric_kinds:
|
|
errors.append(f"{path}.{side_name}.fabric_id references unknown fabric {fabric_id!r}")
|
|
subfabric_id = _text(side.get("subfabric_id"))
|
|
if subfabric_id and fabric_kinds.get(subfabric_id) != "Subfabric":
|
|
errors.append(f"{path}.{side_name}.subfabric_id references unknown subfabric {subfabric_id!r}")
|
|
_required_text(errors, f"{path}.utility.utility_type", utility, "utility_type")
|
|
for key in ("crosses_fabric_boundary", "crosses_subfabric_boundary"):
|
|
if not isinstance(boundary.get(key), bool):
|
|
errors.append(f"{path}.boundary.{key} must be a boolean")
|
|
|
|
|
|
def _validate_optional_object(errors: list[str], path: str, item: dict[str, Any], key: str) -> None:
|
|
if key in item and not isinstance(item[key], dict):
|
|
errors.append(f"{path} must be an object")
|
|
|
|
|
|
def _validate_overlay(errors: list[str], path: str, item: dict[str, Any]) -> None:
|
|
overlay = item.get("deployment_overlay")
|
|
if overlay is None:
|
|
return
|
|
if not isinstance(overlay, dict):
|
|
errors.append(f"{path} must be an object")
|
|
return
|
|
route = overlay.get("route_evidence")
|
|
if route is not None and not isinstance(route, dict):
|
|
errors.append(f"{path}.route_evidence must be an object")
|
|
|
|
|
|
def _required_object(errors: list[str], path: str, item: dict[str, Any], key: str) -> dict[str, Any]:
|
|
value = item.get(key)
|
|
if not isinstance(value, dict):
|
|
errors.append(f"{path} must be an object")
|
|
return {}
|
|
return value
|
|
|
|
|
|
def _required_text(errors: list[str], path: str, item: dict[str, Any], key: str) -> str:
|
|
value = _text(item.get(key))
|
|
if not value:
|
|
errors.append(f"{path} is required")
|
|
return value
|
|
|
|
|
|
def _text(value: Any) -> str:
|
|
return value.strip() if isinstance(value, str) else ""
|
|
|
|
|
|
def _indexed_items(errors: list[str], graph: dict[str, Any], key: str) -> list[tuple[int, dict[str, Any]]]:
|
|
value = graph.get(key, [])
|
|
if not isinstance(value, list):
|
|
errors.append(f"{key} must be an array")
|
|
return []
|
|
return [(index, item) for index, item in enumerate(value) if isinstance(item, dict)]
|
|
|
|
|
|
def _items(graph: dict[str, Any], key: str) -> list[dict[str, Any]]:
|
|
value = graph.get(key, [])
|
|
return [item for item in value if isinstance(item, dict)] if isinstance(value, list) else []
|
|
|
|
|
|
def _edge_identity(edge: dict[str, Any]) -> str:
|
|
if edge.get("id"):
|
|
return str(edge["id"])
|
|
return f"{edge.get('from', '')}:{edge.get('type', '')}:{edge.get('to', '')}"
|