diff --git a/railiance_fabric/zone_view.py b/railiance_fabric/zone_view.py new file mode 100644 index 0000000..ff71781 --- /dev/null +++ b/railiance_fabric/zone_view.py @@ -0,0 +1,661 @@ +from __future__ import annotations + +from collections import defaultdict, deque +from collections.abc import Iterable, Mapping +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(frozen=True) +class ZoneAttractionRule: + edge_type: str | None = None + direction: str = "both" + depth: int = 1 + node_filter: Mapping[str, Any] = field(default_factory=dict) + edge_filter: Mapping[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + if self.direction not in {"out", "in", "both"}: + raise ValueError(f"Unsupported attraction direction: {self.direction}") + if self.depth < 0: + raise ValueError("Attraction depth must be zero or greater") + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> ZoneAttractionRule: + return cls( + edge_type=_optional_string(data.get("edge_type", data.get("edgeType"))), + direction=str(data.get("direction") or "both"), + depth=int(data.get("depth", 1)), + node_filter=_mapping_or_empty(data.get("node_filter", data.get("nodeFilter"))), + edge_filter=_mapping_or_empty(data.get("edge_filter", data.get("edgeFilter"))), + ) + + def to_dict(self) -> dict[str, Any]: + result: dict[str, Any] = { + "direction": self.direction, + "depth": self.depth, + } + if self.edge_type: + result["edge_type"] = self.edge_type + if self.node_filter: + result["node_filter"] = dict(self.node_filter) + if self.edge_filter: + result["edge_filter"] = dict(self.edge_filter) + return result + + +@dataclass(frozen=True) +class ZoneLayout: + algorithm: str = "preset" + options: Mapping[str, Any] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: Mapping[str, Any] | None) -> ZoneLayout: + if not data: + return cls() + return cls( + algorithm=str(data.get("algorithm") or "preset"), + options=_mapping_or_empty(data.get("options")), + ) + + def to_dict(self) -> dict[str, Any]: + return {"algorithm": self.algorithm, "options": dict(self.options)} + + +@dataclass(frozen=True) +class ZonePresentation: + height: int = 0 + color: str = "" + opacity: float = 0.16 + blur_below: bool = False + + @classmethod + def from_dict(cls, data: Mapping[str, Any] | None) -> ZonePresentation: + if not data: + return cls() + return cls( + height=int(data.get("height", 0)), + color=str(data.get("color") or ""), + opacity=float(data.get("opacity", 0.16)), + blur_below=bool(data.get("blur_below", data.get("blurBelow", False))), + ) + + def to_dict(self) -> dict[str, Any]: + return { + "height": self.height, + "color": self.color, + "opacity": self.opacity, + "blur_below": self.blur_below, + } + + +@dataclass(frozen=True) +class ZoneCollapse: + enabled: bool = False + label: str = "" + + @classmethod + def from_dict(cls, data: Mapping[str, Any] | None) -> ZoneCollapse: + if not data: + return cls() + return cls( + enabled=bool(data.get("enabled", False)), + label=str(data.get("label") or ""), + ) + + def to_dict(self) -> dict[str, Any]: + return {"enabled": self.enabled, "label": self.label} + + +@dataclass(frozen=True) +class ZoneDefinition: + id: str + label: str = "" + enabled: bool = True + membership: Mapping[str, Any] = field(default_factory=dict) + attraction_rules: tuple[ZoneAttractionRule, ...] = () + layout: ZoneLayout = field(default_factory=ZoneLayout) + presentation: ZonePresentation = field(default_factory=ZonePresentation) + collapse: ZoneCollapse = field(default_factory=ZoneCollapse) + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> ZoneDefinition: + attraction = data.get("attraction", ()) + if isinstance(attraction, Mapping): + raw_attraction_rules = attraction.get("rules", ()) + else: + raw_attraction_rules = attraction + rules = tuple( + rule if isinstance(rule, ZoneAttractionRule) else ZoneAttractionRule.from_dict(rule) + for rule in raw_attraction_rules + if isinstance(rule, Mapping) or isinstance(rule, ZoneAttractionRule) + ) + zone_id = str(data["id"]) + return cls( + id=zone_id, + label=str(data.get("label") or zone_id), + enabled=bool(data.get("enabled", True)), + membership=_mapping_or_empty(data.get("membership")), + attraction_rules=rules, + layout=ZoneLayout.from_dict(_mapping_or_none(data.get("layout"))), + presentation=ZonePresentation.from_dict(_mapping_or_none(data.get("presentation"))), + collapse=ZoneCollapse.from_dict(_mapping_or_none(data.get("collapse"))), + ) + + def to_dict(self) -> dict[str, Any]: + return { + "id": self.id, + "label": self.label or self.id, + "enabled": self.enabled, + "membership": dict(self.membership), + "attraction": {"rules": [rule.to_dict() for rule in self.attraction_rules]}, + "layout": self.layout.to_dict(), + "presentation": self.presentation.to_dict(), + "collapse": self.collapse.to_dict(), + } + + +@dataclass(frozen=True) +class ZoneNodeAssignment: + node_id: str + zone_id: str + reason: str + depth: int = 0 + + def to_dict(self) -> dict[str, Any]: + return { + "node_id": self.node_id, + "zone_id": self.zone_id, + "reason": self.reason, + "depth": self.depth, + } + + +@dataclass(frozen=True) +class ZoneBoundaryEdge: + edge_id: str + source: str + target: str + edge_type: str + source_zone_id: str | None + target_zone_id: str | None + + def to_dict(self) -> dict[str, Any]: + return { + "edge_id": self.edge_id, + "source": self.source, + "target": self.target, + "edge_type": self.edge_type, + "source_zone_id": self.source_zone_id, + "target_zone_id": self.target_zone_id, + } + + +@dataclass(frozen=True) +class ZoneDiagnostic: + severity: str + code: str + message: str + node_id: str | None = None + edge_id: str | None = None + zone_ids: tuple[str, ...] = () + + def to_dict(self) -> dict[str, Any]: + result: dict[str, Any] = { + "severity": self.severity, + "code": self.code, + "message": self.message, + "zone_ids": list(self.zone_ids), + } + if self.node_id: + result["node_id"] = self.node_id + if self.edge_id: + result["edge_id"] = self.edge_id + return result + + +@dataclass(frozen=True) +class ZoneInstance: + id: str + label: str + definition: ZoneDefinition + node_ids: tuple[str, ...] + seed_node_ids: tuple[str, ...] + attracted_node_ids: tuple[str, ...] + internal_edge_ids: tuple[str, ...] + boundary_edge_ids: tuple[str, ...] + diagnostics: tuple[ZoneDiagnostic, ...] + + def to_dict(self) -> dict[str, Any]: + return { + "id": self.id, + "label": self.label, + "definition": self.definition.to_dict(), + "node_ids": list(self.node_ids), + "seed_node_ids": list(self.seed_node_ids), + "attracted_node_ids": list(self.attracted_node_ids), + "internal_edge_ids": list(self.internal_edge_ids), + "boundary_edge_ids": list(self.boundary_edge_ids), + "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], + } + + +@dataclass(frozen=True) +class ZoneResolution: + zones: tuple[ZoneInstance, ...] + node_assignments: Mapping[str, ZoneNodeAssignment] + boundary_edges: tuple[ZoneBoundaryEdge, ...] + diagnostics: tuple[ZoneDiagnostic, ...] + + def zone_by_id(self, zone_id: str) -> ZoneInstance | None: + return next((zone for zone in self.zones if zone.id == zone_id), None) + + def to_dict(self) -> dict[str, Any]: + return { + "zones": [zone.to_dict() for zone in self.zones], + "node_assignments": { + node_id: assignment.to_dict() + for node_id, assignment in self.node_assignments.items() + }, + "boundary_edges": [edge.to_dict() for edge in self.boundary_edges], + "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], + } + + +@dataclass(frozen=True) +class _NodeRecord: + id: str + data: Mapping[str, Any] + order: int + + +@dataclass(frozen=True) +class _EdgeRecord: + id: str + source: str + target: str + edge_type: str + data: Mapping[str, Any] + order: int + + +@dataclass(frozen=True) +class _Candidate: + node_id: str + zone_id: str + reason: str + depth: int + definition_order: int + + +def resolve_zones( + nodes: Iterable[Mapping[str, Any]], + edges: Iterable[Mapping[str, Any]], + zone_definitions: Iterable[ZoneDefinition | Mapping[str, Any]], +) -> ZoneResolution: + """Resolve zone definitions against graph data without mutating the graph.""" + + node_records = _node_records(nodes) + edge_records = _edge_records(edges) + definitions = tuple( + definition if isinstance(definition, ZoneDefinition) else ZoneDefinition.from_dict(definition) + for definition in zone_definitions + ) + enabled_definitions = tuple( + (index, definition) + for index, definition in enumerate(definitions) + if definition.enabled + ) + diagnostics: list[ZoneDiagnostic] = [] + candidates_by_node_id: dict[str, list[_Candidate]] = defaultdict(list) + seed_node_ids_by_zone_id: dict[str, set[str]] = defaultdict(set) + + for definition_order, definition in enabled_definitions: + for node in node_records: + if _rule_matches(node.data, definition.membership, empty_matches=False): + seed_node_ids_by_zone_id[definition.id].add(node.id) + candidates_by_node_id[node.id].append( + _Candidate(node.id, definition.id, "seed", 0, definition_order) + ) + if not seed_node_ids_by_zone_id[definition.id]: + diagnostics.append( + ZoneDiagnostic( + severity="WARN", + code="ZONE_EMPTY_SEED_SET", + message=f"Zone {definition.id} did not match any seed nodes.", + zone_ids=(definition.id,), + ) + ) + + attraction_candidates = _attraction_candidates( + node_records, + edge_records, + enabled_definitions, + seed_node_ids_by_zone_id, + ) + for candidate in attraction_candidates: + candidates_by_node_id[candidate.node_id].append(candidate) + + height_by_zone_id = { + definition.id: definition.presentation.height + for _, definition in enabled_definitions + } + assignments: dict[str, ZoneNodeAssignment] = {} + for node in node_records: + candidates = candidates_by_node_id.get(node.id, []) + if not candidates: + continue + seed_candidates = _unique_zone_candidates( + candidate for candidate in candidates if candidate.reason == "seed" + ) + attraction_candidates_for_node = _unique_zone_candidates( + candidate for candidate in candidates if candidate.reason == "attraction" + ) + if len(seed_candidates) > 1: + diagnostics.append( + ZoneDiagnostic( + severity="WARN", + code="ZONE_NODE_SEEDED_BY_MULTIPLE_ZONES", + message=f"Node {node.id} matched multiple zone membership rules.", + node_id=node.id, + zone_ids=tuple(candidate.zone_id for candidate in seed_candidates), + ) + ) + if len(attraction_candidates_for_node) > 1: + diagnostics.append( + ZoneDiagnostic( + severity="WARN", + code="ZONE_NODE_ATTRACTED_BY_MULTIPLE_ZONES", + message=f"Node {node.id} was attracted by multiple zones.", + node_id=node.id, + zone_ids=tuple(candidate.zone_id for candidate in attraction_candidates_for_node), + ) + ) + if seed_candidates and any( + candidate.zone_id not in {seed.zone_id for seed in seed_candidates} + for candidate in attraction_candidates_for_node + ): + diagnostics.append( + ZoneDiagnostic( + severity="INFO", + code="ZONE_SEED_OVERRIDES_ATTRACTION", + message=f"Node {node.id} kept seed membership over attraction.", + node_id=node.id, + zone_ids=tuple( + candidate.zone_id + for candidate in [*seed_candidates, *attraction_candidates_for_node] + ), + ) + ) + pool = seed_candidates or attraction_candidates_for_node + chosen = _choose_candidate(pool, height_by_zone_id) + assignments[node.id] = ZoneNodeAssignment( + node_id=node.id, + zone_id=chosen.zone_id, + reason=chosen.reason, + depth=chosen.depth, + ) + + boundary_edges: list[ZoneBoundaryEdge] = [] + internal_edge_ids_by_zone_id: dict[str, list[str]] = defaultdict(list) + boundary_edge_ids_by_zone_id: dict[str, list[str]] = defaultdict(list) + for edge in edge_records: + source_zone_id = assignments.get(edge.source).zone_id if edge.source in assignments else None + target_zone_id = assignments.get(edge.target).zone_id if edge.target in assignments else None + if source_zone_id and source_zone_id == target_zone_id: + internal_edge_ids_by_zone_id[source_zone_id].append(edge.id) + continue + if source_zone_id or target_zone_id: + boundary_edges.append( + ZoneBoundaryEdge( + edge_id=edge.id, + source=edge.source, + target=edge.target, + edge_type=edge.edge_type, + source_zone_id=source_zone_id, + target_zone_id=target_zone_id, + ) + ) + for zone_id in {source_zone_id, target_zone_id} - {None}: + boundary_edge_ids_by_zone_id[str(zone_id)].append(edge.id) + + zones: list[ZoneInstance] = [] + for _, definition in enabled_definitions: + node_ids = tuple( + node.id + for node in node_records + if node.id in assignments and assignments[node.id].zone_id == definition.id + ) + seed_node_ids = tuple( + node_id + for node_id in node_ids + if assignments[node_id].reason == "seed" + ) + attracted_node_ids = tuple( + node_id + for node_id in node_ids + if assignments[node_id].reason == "attraction" + ) + zone_diagnostics = tuple( + diagnostic + for diagnostic in diagnostics + if definition.id in diagnostic.zone_ids + ) + zones.append( + ZoneInstance( + id=definition.id, + label=definition.label or definition.id, + definition=definition, + node_ids=node_ids, + seed_node_ids=seed_node_ids, + attracted_node_ids=attracted_node_ids, + internal_edge_ids=tuple(internal_edge_ids_by_zone_id[definition.id]), + boundary_edge_ids=tuple(boundary_edge_ids_by_zone_id[definition.id]), + diagnostics=zone_diagnostics, + ) + ) + + return ZoneResolution( + zones=tuple(zones), + node_assignments=assignments, + boundary_edges=tuple(boundary_edges), + diagnostics=tuple(diagnostics), + ) + + +def _attraction_candidates( + node_records: tuple[_NodeRecord, ...], + edge_records: tuple[_EdgeRecord, ...], + enabled_definitions: tuple[tuple[int, ZoneDefinition], ...], + seed_node_ids_by_zone_id: Mapping[str, set[str]], +) -> list[_Candidate]: + nodes_by_id = {node.id: node for node in node_records} + adjacency: dict[str, list[_EdgeRecord]] = defaultdict(list) + for edge in edge_records: + adjacency[edge.source].append(edge) + adjacency[edge.target].append(edge) + + candidates: dict[tuple[str, str], _Candidate] = {} + for definition_order, definition in enabled_definitions: + seed_node_ids = seed_node_ids_by_zone_id.get(definition.id, set()) + if not seed_node_ids: + continue + for rule in definition.attraction_rules: + queue: deque[tuple[str, int]] = deque((node_id, 0) for node_id in seed_node_ids) + seen_depth_by_node_id = {node_id: 0 for node_id in seed_node_ids} + while queue: + node_id, depth = queue.popleft() + if depth >= rule.depth: + continue + for edge in adjacency.get(node_id, []): + if not _edge_matches_attraction_rule(edge, rule): + continue + neighbor_id = _neighbor_for_direction(node_id, edge, rule.direction) + if not neighbor_id: + continue + next_depth = depth + 1 + if seen_depth_by_node_id.get(neighbor_id, next_depth + 1) <= next_depth: + continue + neighbor = nodes_by_id.get(neighbor_id) + if not neighbor or not _rule_matches( + neighbor.data, + rule.node_filter, + empty_matches=True, + ): + continue + seen_depth_by_node_id[neighbor_id] = next_depth + queue.append((neighbor_id, next_depth)) + if neighbor_id in seed_node_ids: + continue + key = (neighbor_id, definition.id) + existing = candidates.get(key) + candidate = _Candidate( + node_id=neighbor_id, + zone_id=definition.id, + reason="attraction", + depth=next_depth, + definition_order=definition_order, + ) + if existing is None or candidate.depth < existing.depth: + candidates[key] = candidate + return sorted(candidates.values(), key=lambda candidate: (candidate.node_id, candidate.zone_id)) + + +def _node_records(nodes: Iterable[Mapping[str, Any]]) -> tuple[_NodeRecord, ...]: + records: list[_NodeRecord] = [] + for order, element in enumerate(nodes): + data = _element_data(element) + node_id = str(data.get("id") or data.get("stableKey") or "") + if node_id: + records.append(_NodeRecord(id=node_id, data=data, order=order)) + return tuple(records) + + +def _edge_records(edges: Iterable[Mapping[str, Any]]) -> tuple[_EdgeRecord, ...]: + records: list[_EdgeRecord] = [] + for order, element in enumerate(edges): + data = _element_data(element) + source = str(data.get("source") or "") + target = str(data.get("target") or "") + if not source or not target: + continue + edge_type = str(data.get("edgeType") or data.get("type") or data.get("canonicalType") or "") + edge_id = str(data.get("id") or data.get("stableKey") or f"edge:{source}:{target}:{order}") + records.append( + _EdgeRecord( + id=edge_id, + source=source, + target=target, + edge_type=edge_type, + data=data, + order=order, + ) + ) + return tuple(records) + + +def _element_data(element: Mapping[str, Any]) -> Mapping[str, Any]: + data = element.get("data") + if isinstance(data, Mapping): + return data + return element + + +def _edge_matches_attraction_rule(edge: _EdgeRecord, rule: ZoneAttractionRule) -> bool: + if rule.edge_type and rule.edge_type != "*" and edge.edge_type != rule.edge_type: + return False + return _rule_matches(edge.data, rule.edge_filter, empty_matches=True) + + +def _neighbor_for_direction(node_id: str, edge: _EdgeRecord, direction: str) -> str: + if direction in {"out", "both"} and edge.source == node_id: + return edge.target + if direction in {"in", "both"} and edge.target == node_id: + return edge.source + return "" + + +def _rule_matches(data: Mapping[str, Any], rule: Mapping[str, Any], *, empty_matches: bool) -> bool: + if not rule: + return empty_matches + if "all" in rule: + nested = rule.get("all") + return isinstance(nested, Iterable) and all( + isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False) + for child in nested + ) + if "any" in rule: + nested = rule.get("any") + return isinstance(nested, Iterable) and any( + isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False) + for child in nested + ) + if "rules" in rule: + nested = rule.get("rules") + return isinstance(nested, Iterable) and all( + isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False) + for child in nested + ) + + field = str(rule.get("field") or "") + op = str(rule.get("op") or "equals") + expected = rule.get("value") + actual = _field_value(data, field) + if op == "equals": + return actual == expected + if op == "in": + if isinstance(expected, str) or not isinstance(expected, Iterable): + return actual == expected + return actual in expected + if op == "exists": + return actual is not None and actual != "" and actual != [] + raise ValueError(f"Unsupported zone rule operator: {op}") + + +def _field_value(data: Mapping[str, Any], field: str) -> Any: + if not field: + return None + current: Any = data + for part in field.split("."): + if not isinstance(current, Mapping) or part not in current: + return None + current = current[part] + return current + + +def _unique_zone_candidates(candidates: Iterable[_Candidate]) -> list[_Candidate]: + by_zone_id: dict[str, _Candidate] = {} + for candidate in candidates: + existing = by_zone_id.get(candidate.zone_id) + if existing is None or (candidate.depth, candidate.definition_order) < ( + existing.depth, + existing.definition_order, + ): + by_zone_id[candidate.zone_id] = candidate + return list(by_zone_id.values()) + + +def _choose_candidate(candidates: Iterable[_Candidate], height_by_zone_id: Mapping[str, int]) -> _Candidate: + return sorted( + candidates, + key=lambda candidate: ( + -height_by_zone_id.get(candidate.zone_id, 0), + candidate.definition_order, + candidate.zone_id, + candidate.depth, + ), + )[0] + + +def _mapping_or_empty(value: Any) -> Mapping[str, Any]: + return value if isinstance(value, Mapping) else {} + + +def _mapping_or_none(value: Any) -> Mapping[str, Any] | None: + return value if isinstance(value, Mapping) else None + + +def _optional_string(value: Any) -> str | None: + if value is None or value == "": + return None + return str(value) diff --git a/tests/test_zone_view.py b/tests/test_zone_view.py new file mode 100644 index 0000000..e659147 --- /dev/null +++ b/tests/test_zone_view.py @@ -0,0 +1,231 @@ +from __future__ import annotations + +from railiance_fabric.zone_view import ZoneDefinition, resolve_zones + + +def _node(node_id: str, **data: object) -> dict[str, dict[str, object]]: + return {"data": {"id": node_id, **data}} + + +def _edge(edge_id: str, source: str, target: str, edge_type: str, **data: object) -> dict[str, dict[str, object]]: + return { + "data": { + "id": edge_id, + "source": source, + "target": target, + "edgeType": edge_type, + **data, + } + } + + +def test_zone_definition_round_trips() -> None: + definition = ZoneDefinition.from_dict( + { + "id": "prod", + "label": "Production", + "membership": { + "rules": [ + {"field": "deploymentEnvironment", "op": "equals", "value": "prod"}, + {"field": "kind", "op": "in", "value": ["service", "endpoint"]}, + ] + }, + "attraction": { + "rules": [ + { + "edge_type": "routes_to_service", + "direction": "out", + "depth": 2, + "node_filter": {"field": "kind", "op": "exists"}, + } + ] + }, + "layout": {"algorithm": "cose", "options": {"animate": False}}, + "presentation": {"height": 30, "color": "#be123c", "opacity": 0.2, "blur_below": True}, + "collapse": {"enabled": True, "label": "Production Zone"}, + } + ) + + assert definition.id == "prod" + assert definition.label == "Production" + assert definition.attraction_rules[0].edge_type == "routes_to_service" + assert definition.attraction_rules[0].depth == 2 + assert definition.layout.algorithm == "cose" + assert definition.presentation.height == 30 + assert definition.collapse.enabled is True + + round_tripped = ZoneDefinition.from_dict(definition.to_dict()) + + assert round_tripped == definition + + +def test_resolver_assigns_seed_nodes_and_boundary_edges() -> None: + resolution = resolve_zones( + nodes=[ + _node("svc.prod", deploymentEnvironment="prod"), + _node("svc.test", deploymentEnvironment="test"), + ], + edges=[ + _edge("edge.prod-test", "svc.prod", "svc.test", "routes_to_service"), + ], + zone_definitions=[ + { + "id": "prod", + "label": "prod", + "membership": {"field": "deploymentEnvironment", "op": "equals", "value": "prod"}, + "presentation": {"height": 20}, + }, + { + "id": "test", + "label": "test", + "membership": {"field": "deploymentEnvironment", "op": "equals", "value": "test"}, + "presentation": {"height": 10}, + }, + ], + ) + + assert resolution.node_assignments["svc.prod"].zone_id == "prod" + assert resolution.node_assignments["svc.prod"].reason == "seed" + assert resolution.zone_by_id("prod").seed_node_ids == ("svc.prod",) + assert resolution.zone_by_id("test").seed_node_ids == ("svc.test",) + assert resolution.boundary_edges[0].edge_id == "edge.prod-test" + assert resolution.boundary_edges[0].source_zone_id == "prod" + assert resolution.boundary_edges[0].target_zone_id == "test" + + +def test_resolver_attracts_nodes_by_edge_type_direction_and_depth() -> None: + resolution = resolve_zones( + nodes=[ + _node("seed", deploymentEnvironment="prod", kind="service"), + _node("near", kind="endpoint"), + _node("far", kind="endpoint"), + ], + edges=[ + _edge("edge.seed-near", "seed", "near", "routes_to_service"), + _edge("edge.near-far", "near", "far", "routes_to_service"), + ], + zone_definitions=[ + { + "id": "prod", + "membership": {"field": "deploymentEnvironment", "op": "equals", "value": "prod"}, + "attraction": { + "rules": [ + { + "edge_type": "routes_to_service", + "direction": "out", + "depth": 1, + "node_filter": {"field": "kind", "op": "exists"}, + } + ] + }, + } + ], + ) + + assert resolution.node_assignments["near"].zone_id == "prod" + assert resolution.node_assignments["near"].reason == "attraction" + assert resolution.node_assignments["near"].depth == 1 + assert "far" not in resolution.node_assignments + assert resolution.zone_by_id("prod").internal_edge_ids == ("edge.seed-near",) + assert resolution.zone_by_id("prod").boundary_edge_ids == ("edge.near-far",) + + +def test_resolver_keeps_seed_membership_over_attraction() -> None: + resolution = resolve_zones( + nodes=[ + _node("seed", deploymentEnvironment="prod", kind="service"), + _node("tenant", deploymentEnvironment="tenant", kind="endpoint"), + ], + edges=[ + _edge("edge.seed-tenant", "seed", "tenant", "routes_to_service"), + ], + zone_definitions=[ + { + "id": "prod", + "membership": {"field": "deploymentEnvironment", "op": "equals", "value": "prod"}, + "attraction": { + "rules": [ + { + "edge_type": "routes_to_service", + "direction": "out", + "depth": 1, + } + ] + }, + "presentation": {"height": 100}, + }, + { + "id": "tenant", + "membership": {"field": "deploymentEnvironment", "op": "equals", "value": "tenant"}, + "presentation": {"height": 0}, + }, + ], + ) + + assert resolution.node_assignments["tenant"].zone_id == "tenant" + assert resolution.node_assignments["tenant"].reason == "seed" + assert "ZONE_SEED_OVERRIDES_ATTRACTION" in { + diagnostic.code for diagnostic in resolution.diagnostics + } + + +def test_resolver_uses_height_then_definition_order_for_overlapping_membership() -> None: + height_resolution = resolve_zones( + nodes=[_node("shared", deploymentEnvironment="prod", owner="lord")], + edges=[], + zone_definitions=[ + { + "id": "lower", + "membership": {"field": "deploymentEnvironment", "op": "equals", "value": "prod"}, + "presentation": {"height": 10}, + }, + { + "id": "higher", + "membership": {"field": "owner", "op": "equals", "value": "lord"}, + "presentation": {"height": 20}, + }, + ], + ) + + assert height_resolution.node_assignments["shared"].zone_id == "higher" + assert "ZONE_NODE_SEEDED_BY_MULTIPLE_ZONES" in { + diagnostic.code for diagnostic in height_resolution.diagnostics + } + + order_resolution = resolve_zones( + nodes=[_node("shared", deploymentEnvironment="prod", owner="lord")], + edges=[], + zone_definitions=[ + { + "id": "first", + "membership": {"field": "deploymentEnvironment", "op": "equals", "value": "prod"}, + "presentation": {"height": 10}, + }, + { + "id": "second", + "membership": {"field": "owner", "op": "equals", "value": "lord"}, + "presentation": {"height": 10}, + }, + ], + ) + + assert order_resolution.node_assignments["shared"].zone_id == "first" + + +def test_resolver_serializes_resolution() -> None: + resolution = resolve_zones( + nodes=[_node("svc", deploymentEnvironment="prod")], + edges=[], + zone_definitions=[ + { + "id": "prod", + "membership": {"field": "deploymentEnvironment", "op": "equals", "value": "prod"}, + } + ], + ) + + serialized = resolution.to_dict() + + assert serialized["zones"][0]["id"] == "prod" + assert serialized["node_assignments"]["svc"]["zone_id"] == "prod" + assert serialized["node_assignments"]["svc"]["reason"] == "seed" diff --git a/workplans/RAIL-FAB-WP-0022-zone-entity-visualization-engine.md b/workplans/RAIL-FAB-WP-0022-zone-entity-visualization-engine.md index 237d6ef..c0c8bf5 100644 --- a/workplans/RAIL-FAB-WP-0022-zone-entity-visualization-engine.md +++ b/workplans/RAIL-FAB-WP-0022-zone-entity-visualization-engine.md @@ -34,7 +34,7 @@ step. ```task id: RAIL-FAB-WP-0022-T01 -status: in_progress +status: done priority: high state_hub_task_id: "636b14f7-e5c7-4d5d-acda-40f0c270cd29" ``` @@ -55,11 +55,17 @@ but it does not need a persistence store in this task. Expected result: a small typed model or schema with tests for construction and basic serialization. +Result: Added `railiance_fabric.zone_view` dataclasses for zone definitions, +attraction rules, layout, presentation, collapse settings, resolved zone +instances, node assignments, boundary edges, diagnostics, and serializable +resolution output. Covered definition round-trip and resolution serialization in +`tests/test_zone_view.py`. + ## Task 2: Implement A Pure Zone Resolver ```task id: RAIL-FAB-WP-0022-T02 -status: in_progress +status: done priority: high state_hub_task_id: "d0ca41d2-f7c4-4409-8799-4c0099192af5" ``` @@ -78,6 +84,13 @@ The first resolver should support a conservative rule subset: Expected result: resolver unit tests that prove deterministic node assignment, seed-vs-attraction precedence, conflict reporting, and depth-limited attraction. +Result: Implemented `resolve_zones()` as a pure resolver over Cytoscape-style or +raw node/edge mappings. It supports `equals`, `in`, `exists`, `all`, `any`, +membership `rules`, edge-type/direction/depth attraction, seed precedence, +height/order conflict resolution, single-zone node assignments, boundary edge +summaries, and conflict diagnostics. Covered the core behavior in +`tests/test_zone_view.py`. + ## Task 3: Back The Existing Overlays With Zone Definitions ```task