Files
railiance-fabric/railiance_fabric/financial.py

401 lines
18 KiB
Python

from __future__ import annotations
import json
from typing import Any
FINANCIAL_API_VERSION = "railiance.fabric/v1alpha2"
FINANCIAL_SCHEMA_VERSION = "financial-fabric-v1"
ACTOR_ROLES = {"king", "lord", "tenant", "operator", "steward"}
OWNERSHIP_RESOLUTIONS = {"explicit", "inherited", "unresolved", "ambiguous"}
EVIDENCE_STATES = {"observed", "declared", "inferred", "proposed", "gap"}
REVIEW_STATES = {"accepted", "candidate", "needs_review", "rejected"}
RELATIONSHIP_CATEGORIES = {
"containment",
"ownership",
"technical",
"utility",
"accounting",
"evidence",
}
FABRIC_KINDS = {"Fabric", "Subfabric"}
ENVIRONMENT_LABELS = {"local", "dev", "staging", "prod", "lab", "all"}
def is_financial_graph_export(graph: dict[str, Any]) -> bool:
return (
graph.get("apiVersion") == FINANCIAL_API_VERSION
or graph.get("schema_version") == FINANCIAL_SCHEMA_VERSION
)
def materialize_financial_graph_export(graph: dict[str, Any]) -> dict[str, Any]:
result = json.loads(json.dumps(graph))
if not is_financial_graph_export(result):
return result
result["apiVersion"] = FINANCIAL_API_VERSION
result.setdefault("kind", "FabricGraphExport")
result.setdefault("schema_version", FINANCIAL_SCHEMA_VERSION)
result.setdefault("compatibility", {})
result.setdefault("actors", [])
result.setdefault("fabrics", [])
result.setdefault("nodes", [])
result.setdefault("edges", [])
result.setdefault("unresolved", [])
for node in _items(result, "nodes"):
_materialize_node(node)
for edge in _items(result, "edges"):
_materialize_edge(edge)
return result
def financial_graph_errors(graph: dict[str, Any]) -> list[str]:
if not is_financial_graph_export(graph):
return []
errors: list[str] = []
if graph.get("apiVersion") != FINANCIAL_API_VERSION:
errors.append(f"apiVersion must be {FINANCIAL_API_VERSION!r}")
if graph.get("kind") != "FabricGraphExport":
errors.append("kind must be 'FabricGraphExport'")
if graph.get("schema_version") != FINANCIAL_SCHEMA_VERSION:
errors.append(f"schema_version must be {FINANCIAL_SCHEMA_VERSION!r}")
netkingdom = graph.get("netkingdom")
if not isinstance(netkingdom, dict):
errors.append("netkingdom must be an object")
netkingdom = {}
netkingdom_id = _required_text(errors, "netkingdom.id", netkingdom, "id")
king_actor_id = _required_text(errors, "netkingdom.king_actor_id", netkingdom, "king_actor_id")
actors = _indexed_items(errors, graph, "actors")
actor_roles: dict[str, str] = {}
for index, actor in actors:
actor_id = _required_text(errors, f"actors[{index}].id", actor, "id")
role = _required_text(errors, f"actors[{index}].role", actor, "role")
if role and role not in ACTOR_ROLES:
errors.append(f"actors[{index}].role {role!r} is not valid")
if actor_id:
if actor_id in actor_roles:
errors.append(f"actors[{index}].id {actor_id!r} is duplicated")
actor_roles[actor_id] = role
if king_actor_id and actor_roles.get(king_actor_id) != "king":
errors.append("netkingdom.king_actor_id must reference an actor with role 'king'")
fabrics = _indexed_items(errors, graph, "fabrics")
fabric_kinds: dict[str, str] = {}
for index, fabric in fabrics:
fabric_id = _required_text(errors, f"fabrics[{index}].id", fabric, "id")
kind = _required_text(errors, f"fabrics[{index}].kind", fabric, "kind")
if kind and kind not in FABRIC_KINDS:
errors.append(f"fabrics[{index}].kind {kind!r} is not valid")
if fabric_id:
if fabric_id in ENVIRONMENT_LABELS:
errors.append(f"fabrics[{index}].id must not be an environment label")
if fabric_id in fabric_kinds:
errors.append(f"fabrics[{index}].id {fabric_id!r} is duplicated")
fabric_kinds[fabric_id] = kind
if _text(fabric.get("netkingdom_id")) != netkingdom_id:
errors.append(f"fabrics[{index}].netkingdom_id must match netkingdom.id")
if kind == "Fabric":
lord_actor_id = _required_text(errors, f"fabrics[{index}].lord_actor_id", fabric, "lord_actor_id")
if lord_actor_id and actor_roles.get(lord_actor_id) not in {"lord", "king"}:
errors.append(f"fabrics[{index}].lord_actor_id must reference a lord or king actor")
if kind == "Subfabric":
parent_id = _required_text(errors, f"fabrics[{index}].parent_fabric_id", fabric, "parent_fabric_id")
tenant_actor_id = _required_text(errors, f"fabrics[{index}].tenant_actor_id", fabric, "tenant_actor_id")
if parent_id and parent_id not in fabric_kinds:
errors.append(f"fabrics[{index}].parent_fabric_id references unknown fabric {parent_id!r}")
if tenant_actor_id and actor_roles.get(tenant_actor_id) != "tenant":
errors.append(f"fabrics[{index}].tenant_actor_id must reference a tenant actor")
node_ids = set()
nodes = _indexed_items(errors, graph, "nodes")
for index, node in nodes:
node_id = _required_text(errors, f"nodes[{index}].id", node, "id")
_required_text(errors, f"nodes[{index}].kind", node, "kind")
_required_text(errors, f"nodes[{index}].name", node, "name")
if node_id:
if node_id in node_ids:
errors.append(f"nodes[{index}].id {node_id!r} is duplicated")
node_ids.add(node_id)
review_state = _validate_evidence(errors, f"nodes[{index}]", node)
_validate_containment(errors, f"nodes[{index}]", node, netkingdom_id, fabric_kinds, accepted=review_state == "accepted")
_validate_ownership(errors, f"nodes[{index}]", node, actor_roles, accepted=review_state == "accepted")
_validate_optional_object(errors, f"nodes[{index}].accounting", node, "accounting")
edges = _indexed_items(errors, graph, "edges")
for index, edge in edges:
path = f"edges[{index}]"
source = _required_text(errors, f"{path}.from", edge, "from")
target = _required_text(errors, f"{path}.to", edge, "to")
edge_type = _required_text(errors, f"{path}.type", edge, "type")
if source and source not in node_ids:
errors.append(f"{path}.from references unknown node {source!r}")
if target and target not in node_ids:
errors.append(f"{path}.to references unknown node {target!r}")
category = _required_text(errors, f"{path}.relationship_category", edge, "relationship_category")
if category and category not in RELATIONSHIP_CATEGORIES:
errors.append(f"{path}.relationship_category {category!r} is not valid")
_validate_evidence(errors, path, edge)
_validate_optional_object(errors, f"{path}.accounting", edge, "accounting")
if edge_type == "provides_utility_to" and category != "utility":
errors.append(f"{path}.relationship_category must be 'utility' for provides_utility_to edges")
if category == "utility":
_validate_utility_edge(errors, path, edge, actor_roles, fabric_kinds)
for index, item in _indexed_items(errors, graph, "unresolved"):
_required_text(errors, f"unresolved[{index}].target_id", item, "target_id")
_required_text(errors, f"unresolved[{index}].kind", item, "kind")
_required_text(errors, f"unresolved[{index}].message", item, "message")
return errors
def merge_financial_graph_exports(graphs: list[dict[str, Any]], *, generated_at: str) -> dict[str, Any]:
merged: dict[str, Any] = {
"apiVersion": FINANCIAL_API_VERSION,
"kind": "FabricGraphExport",
"schema_version": FINANCIAL_SCHEMA_VERSION,
"generated_at": generated_at,
"source": {"producer": "railiance-fabric", "registry": "registry", "commit": ""},
"compatibility": {"legacy_v1alpha1_supported": True, "breaking_reset": False},
"actors": [],
"fabrics": [],
"nodes": [],
"edges": [],
"unresolved": [],
}
actors: dict[str, dict[str, Any]] = {}
fabrics: dict[str, dict[str, Any]] = {}
nodes: dict[str, dict[str, Any]] = {}
edges: dict[str, dict[str, Any]] = {}
unresolved: dict[str, dict[str, Any]] = {}
for graph in graphs:
graph = materialize_financial_graph_export(graph)
if "netkingdom" in graph and "netkingdom" not in merged:
merged["netkingdom"] = graph["netkingdom"]
for actor in _items(graph, "actors"):
actor_id = str(actor.get("id") or "")
if actor_id:
actors[actor_id] = actor
for fabric in _items(graph, "fabrics"):
fabric_id = str(fabric.get("id") or "")
if fabric_id:
fabrics[fabric_id] = fabric
for node in _items(graph, "nodes"):
node_id = str(node.get("id") or "")
if node_id:
nodes[node_id] = node
for edge in _items(graph, "edges"):
edges[_edge_identity(edge)] = edge
for item in _items(graph, "unresolved"):
target = str(item.get("target_id") or "")
kind = str(item.get("kind") or "")
if target or kind:
unresolved[f"{target}:{kind}"] = item
merged["actors"] = [actors[key] for key in sorted(actors)]
merged["fabrics"] = [fabrics[key] for key in sorted(fabrics)]
merged["nodes"] = [nodes[key] for key in sorted(nodes)]
merged["edges"] = [edges[key] for key in sorted(edges)]
merged["unresolved"] = [unresolved[key] for key in sorted(unresolved)]
return materialize_financial_graph_export(merged)
def _materialize_node(node: dict[str, Any]) -> None:
attrs = node.get("attributes") if isinstance(node.get("attributes"), dict) else {}
for key in ("containment", "ownership", "accounting"):
if key not in node and isinstance(attrs.get(key), dict):
node[key] = attrs[key]
node.setdefault("evidence", _legacy_evidence(node, attrs))
def _materialize_edge(edge: dict[str, Any]) -> None:
attrs = edge.get("attributes") if isinstance(edge.get("attributes"), dict) else {}
if "accounting" not in edge and isinstance(attrs.get("accounting"), dict):
edge["accounting"] = attrs["accounting"]
edge.setdefault("relationship_category", _relationship_category(edge))
edge.setdefault("evidence", _legacy_evidence(edge, attrs))
if edge.get("relationship_category") == "utility":
provider = edge.get("provider") if isinstance(edge.get("provider"), dict) else {}
consumer = edge.get("consumer") if isinstance(edge.get("consumer"), dict) else {}
boundary = edge.setdefault("boundary", {})
if isinstance(boundary, dict) and provider and consumer:
boundary.setdefault("crosses_fabric_boundary", provider.get("fabric_id") != consumer.get("fabric_id"))
boundary.setdefault("crosses_subfabric_boundary", provider.get("subfabric_id") != consumer.get("subfabric_id"))
def _legacy_evidence(item: dict[str, Any], attrs: dict[str, Any]) -> dict[str, Any]:
confidence = attrs.get("discovery_confidence")
evidence: dict[str, Any] = {
"state": item.get("evidence_state") or "declared",
"review_state": attrs.get("discovery_review_state") or "accepted",
"refs": attrs.get("discovery_source_anchors") if isinstance(attrs.get("discovery_source_anchors"), list) else [],
}
if isinstance(confidence, (int, float)):
evidence["confidence"] = float(confidence)
return evidence
def _relationship_category(edge: dict[str, Any]) -> str:
edge_type = str(edge.get("type") or "")
if edge_type in {"contains", "part_of"}:
return "containment"
if edge_type in {"owned_by", "operated_by"}:
return "ownership"
if edge_type == "provides_utility_to":
return "utility"
if edge_type in {"attributed_to_cost_center", "attributed_to_profit_center"}:
return "accounting"
if edge_type == "evidenced_by":
return "evidence"
return "technical"
def _validate_evidence(errors: list[str], path: str, item: dict[str, Any]) -> str:
evidence = item.get("evidence")
if not isinstance(evidence, dict):
errors.append(f"{path}.evidence must be an object")
return ""
state = _required_text(errors, f"{path}.evidence.state", evidence, "state")
review_state = _required_text(errors, f"{path}.evidence.review_state", evidence, "review_state")
if state and state not in EVIDENCE_STATES:
errors.append(f"{path}.evidence.state {state!r} is not valid")
if review_state and review_state not in REVIEW_STATES:
errors.append(f"{path}.evidence.review_state {review_state!r} is not valid")
return review_state
def _validate_containment(
errors: list[str],
path: str,
node: dict[str, Any],
netkingdom_id: str,
fabric_kinds: dict[str, str],
*,
accepted: bool,
) -> None:
containment = node.get("containment")
if not isinstance(containment, dict):
if accepted:
errors.append(f"{path}.containment must be an object for accepted nodes")
return
if accepted:
if _text(containment.get("netkingdom_id")) != netkingdom_id:
errors.append(f"{path}.containment.netkingdom_id must match netkingdom.id")
fabric_id = _required_text(errors, f"{path}.containment.fabric_id", containment, "fabric_id")
if fabric_id:
if fabric_id in ENVIRONMENT_LABELS:
errors.append(f"{path}.containment.fabric_id must not be an environment label")
if fabric_id not in fabric_kinds:
errors.append(f"{path}.containment.fabric_id references unknown fabric {fabric_id!r}")
subfabric_id = _text(containment.get("subfabric_id"))
if subfabric_id:
if fabric_kinds.get(subfabric_id) != "Subfabric":
errors.append(f"{path}.containment.subfabric_id references unknown subfabric {subfabric_id!r}")
environment = _text(containment.get("environment"))
if environment and environment in fabric_kinds:
errors.append(f"{path}.containment.environment must not reference a fabric id")
def _validate_ownership(
errors: list[str],
path: str,
node: dict[str, Any],
actor_roles: dict[str, str],
*,
accepted: bool,
) -> None:
ownership = node.get("ownership")
if not isinstance(ownership, dict):
if accepted:
errors.append(f"{path}.ownership must be an object for accepted nodes")
return
owner_actor_id = _required_text(errors, f"{path}.ownership.owner_actor_id", ownership, "owner_actor_id")
owner_role = _required_text(errors, f"{path}.ownership.owner_role", ownership, "owner_role")
resolution = _required_text(errors, f"{path}.ownership.resolution", ownership, "resolution")
if owner_role and owner_role not in ACTOR_ROLES:
errors.append(f"{path}.ownership.owner_role {owner_role!r} is not valid")
if resolution and resolution not in OWNERSHIP_RESOLUTIONS:
errors.append(f"{path}.ownership.resolution {resolution!r} is not valid")
if owner_actor_id and owner_actor_id not in actor_roles:
errors.append(f"{path}.ownership.owner_actor_id references unknown actor {owner_actor_id!r}")
if owner_actor_id and owner_role and actor_roles.get(owner_actor_id) not in {owner_role, ""}:
errors.append(f"{path}.ownership.owner_role does not match referenced actor role")
if accepted and resolution in {"unresolved", "ambiguous"}:
errors.append(f"{path}.ownership.resolution must be explicit or inherited for accepted nodes")
def _validate_utility_edge(
errors: list[str],
path: str,
edge: dict[str, Any],
actor_roles: dict[str, str],
fabric_kinds: dict[str, str],
) -> None:
provider = _required_object(errors, f"{path}.provider", edge, "provider")
consumer = _required_object(errors, f"{path}.consumer", edge, "consumer")
boundary = _required_object(errors, f"{path}.boundary", edge, "boundary")
utility = _required_object(errors, f"{path}.utility", edge, "utility")
for side_name, side in (("provider", provider), ("consumer", consumer)):
actor_id = _required_text(errors, f"{path}.{side_name}.owner_actor_id", side, "owner_actor_id")
fabric_id = _required_text(errors, f"{path}.{side_name}.fabric_id", side, "fabric_id")
if actor_id and actor_id not in actor_roles:
errors.append(f"{path}.{side_name}.owner_actor_id references unknown actor {actor_id!r}")
if fabric_id and fabric_id not in fabric_kinds:
errors.append(f"{path}.{side_name}.fabric_id references unknown fabric {fabric_id!r}")
subfabric_id = _text(side.get("subfabric_id"))
if subfabric_id and fabric_kinds.get(subfabric_id) != "Subfabric":
errors.append(f"{path}.{side_name}.subfabric_id references unknown subfabric {subfabric_id!r}")
_required_text(errors, f"{path}.utility.utility_type", utility, "utility_type")
for key in ("crosses_fabric_boundary", "crosses_subfabric_boundary"):
if not isinstance(boundary.get(key), bool):
errors.append(f"{path}.boundary.{key} must be a boolean")
def _validate_optional_object(errors: list[str], path: str, item: dict[str, Any], key: str) -> None:
if key in item and not isinstance(item[key], dict):
errors.append(f"{path} must be an object")
def _required_object(errors: list[str], path: str, item: dict[str, Any], key: str) -> dict[str, Any]:
value = item.get(key)
if not isinstance(value, dict):
errors.append(f"{path} must be an object")
return {}
return value
def _required_text(errors: list[str], path: str, item: dict[str, Any], key: str) -> str:
value = _text(item.get(key))
if not value:
errors.append(f"{path} is required")
return value
def _text(value: Any) -> str:
return value.strip() if isinstance(value, str) else ""
def _indexed_items(errors: list[str], graph: dict[str, Any], key: str) -> list[tuple[int, dict[str, Any]]]:
value = graph.get(key, [])
if not isinstance(value, list):
errors.append(f"{key} must be an array")
return []
return [(index, item) for index, item in enumerate(value) if isinstance(item, dict)]
def _items(graph: dict[str, Any], key: str) -> list[dict[str, Any]]:
value = graph.get(key, [])
return [item for item in value if isinstance(item, dict)] if isinstance(value, list) else []
def _edge_identity(edge: dict[str, Any]) -> str:
if edge.get("id"):
return str(edge["id"])
return f"{edge.get('from', '')}:{edge.get('type', '')}:{edge.get('to', '')}"