from __future__ import annotations from collections import defaultdict, deque from collections.abc import Iterable, Mapping from dataclasses import dataclass, field from typing import Any @dataclass(frozen=True) class ZoneAttractionRule: edge_type: str | None = None direction: str = "both" depth: int = 1 node_filter: Mapping[str, Any] = field(default_factory=dict) edge_filter: Mapping[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: if self.direction not in {"out", "in", "both"}: raise ValueError(f"Unsupported attraction direction: {self.direction}") if self.depth < 0: raise ValueError("Attraction depth must be zero or greater") @classmethod def from_dict(cls, data: Mapping[str, Any]) -> ZoneAttractionRule: return cls( edge_type=_optional_string(data.get("edge_type", data.get("edgeType"))), direction=str(data.get("direction") or "both"), depth=int(data.get("depth", 1)), node_filter=_mapping_or_empty(data.get("node_filter", data.get("nodeFilter"))), edge_filter=_mapping_or_empty(data.get("edge_filter", data.get("edgeFilter"))), ) def to_dict(self) -> dict[str, Any]: result: dict[str, Any] = { "direction": self.direction, "depth": self.depth, } if self.edge_type: result["edge_type"] = self.edge_type if self.node_filter: result["node_filter"] = dict(self.node_filter) if self.edge_filter: result["edge_filter"] = dict(self.edge_filter) return result @dataclass(frozen=True) class ZoneLayout: algorithm: str = "preset" options: Mapping[str, Any] = field(default_factory=dict) @classmethod def from_dict(cls, data: Mapping[str, Any] | None) -> ZoneLayout: if not data: return cls() return cls( algorithm=str(data.get("algorithm") or "preset"), options=_mapping_or_empty(data.get("options")), ) def to_dict(self) -> dict[str, Any]: return {"algorithm": self.algorithm, "options": dict(self.options)} @dataclass(frozen=True) class ZonePresentation: height: int = 0 color: str = "" opacity: float = 0.16 blur_below: bool = False @classmethod def from_dict(cls, data: Mapping[str, Any] | None) -> ZonePresentation: if not data: return cls() return cls( height=int(data.get("height", 0)), color=str(data.get("color") or ""), opacity=float(data.get("opacity", 0.16)), blur_below=bool(data.get("blur_below", data.get("blurBelow", False))), ) def to_dict(self) -> dict[str, Any]: return { "height": self.height, "color": self.color, "opacity": self.opacity, "blur_below": self.blur_below, } @dataclass(frozen=True) class ZoneCollapse: enabled: bool = False label: str = "" @classmethod def from_dict(cls, data: Mapping[str, Any] | None) -> ZoneCollapse: if not data: return cls() return cls( enabled=bool(data.get("enabled", False)), label=str(data.get("label") or ""), ) def to_dict(self) -> dict[str, Any]: return {"enabled": self.enabled, "label": self.label} @dataclass(frozen=True) class ZoneDefinition: id: str label: str = "" enabled: bool = True membership: Mapping[str, Any] = field(default_factory=dict) attraction_rules: tuple[ZoneAttractionRule, ...] = () layout: ZoneLayout = field(default_factory=ZoneLayout) presentation: ZonePresentation = field(default_factory=ZonePresentation) collapse: ZoneCollapse = field(default_factory=ZoneCollapse) @classmethod def from_dict(cls, data: Mapping[str, Any]) -> ZoneDefinition: attraction = data.get("attraction", ()) if isinstance(attraction, Mapping): raw_attraction_rules = attraction.get("rules", ()) else: raw_attraction_rules = attraction rules = tuple( rule if isinstance(rule, ZoneAttractionRule) else ZoneAttractionRule.from_dict(rule) for rule in raw_attraction_rules if isinstance(rule, Mapping) or isinstance(rule, ZoneAttractionRule) ) zone_id = str(data["id"]) return cls( id=zone_id, label=str(data.get("label") or zone_id), enabled=bool(data.get("enabled", True)), membership=_mapping_or_empty(data.get("membership")), attraction_rules=rules, layout=ZoneLayout.from_dict(_mapping_or_none(data.get("layout"))), presentation=ZonePresentation.from_dict(_mapping_or_none(data.get("presentation"))), collapse=ZoneCollapse.from_dict(_mapping_or_none(data.get("collapse"))), ) def to_dict(self) -> dict[str, Any]: return { "id": self.id, "label": self.label or self.id, "enabled": self.enabled, "membership": dict(self.membership), "attraction": {"rules": [rule.to_dict() for rule in self.attraction_rules]}, "layout": self.layout.to_dict(), "presentation": self.presentation.to_dict(), "collapse": self.collapse.to_dict(), } @dataclass(frozen=True) class ZoneNodeAssignment: node_id: str zone_id: str reason: str depth: int = 0 def to_dict(self) -> dict[str, Any]: return { "node_id": self.node_id, "zone_id": self.zone_id, "reason": self.reason, "depth": self.depth, } @dataclass(frozen=True) class ZoneBoundaryEdge: edge_id: str source: str target: str edge_type: str source_zone_id: str | None target_zone_id: str | None def to_dict(self) -> dict[str, Any]: return { "edge_id": self.edge_id, "source": self.source, "target": self.target, "edge_type": self.edge_type, "source_zone_id": self.source_zone_id, "target_zone_id": self.target_zone_id, } @dataclass(frozen=True) class ZoneDiagnostic: severity: str code: str message: str node_id: str | None = None edge_id: str | None = None zone_ids: tuple[str, ...] = () def to_dict(self) -> dict[str, Any]: result: dict[str, Any] = { "severity": self.severity, "code": self.code, "message": self.message, "zone_ids": list(self.zone_ids), } if self.node_id: result["node_id"] = self.node_id if self.edge_id: result["edge_id"] = self.edge_id return result @dataclass(frozen=True) class ZoneInstance: id: str label: str definition: ZoneDefinition node_ids: tuple[str, ...] seed_node_ids: tuple[str, ...] attracted_node_ids: tuple[str, ...] internal_edge_ids: tuple[str, ...] boundary_edge_ids: tuple[str, ...] diagnostics: tuple[ZoneDiagnostic, ...] def to_dict(self) -> dict[str, Any]: return { "id": self.id, "label": self.label, "definition": self.definition.to_dict(), "node_ids": list(self.node_ids), "seed_node_ids": list(self.seed_node_ids), "attracted_node_ids": list(self.attracted_node_ids), "internal_edge_ids": list(self.internal_edge_ids), "boundary_edge_ids": list(self.boundary_edge_ids), "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], } @dataclass(frozen=True) class ZoneResolution: zones: tuple[ZoneInstance, ...] node_assignments: Mapping[str, ZoneNodeAssignment] boundary_edges: tuple[ZoneBoundaryEdge, ...] diagnostics: tuple[ZoneDiagnostic, ...] def zone_by_id(self, zone_id: str) -> ZoneInstance | None: return next((zone for zone in self.zones if zone.id == zone_id), None) def to_dict(self) -> dict[str, Any]: return { "zones": [zone.to_dict() for zone in self.zones], "node_assignments": { node_id: assignment.to_dict() for node_id, assignment in self.node_assignments.items() }, "boundary_edges": [edge.to_dict() for edge in self.boundary_edges], "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], } @dataclass(frozen=True) class _NodeRecord: id: str data: Mapping[str, Any] order: int @dataclass(frozen=True) class _EdgeRecord: id: str source: str target: str edge_type: str data: Mapping[str, Any] order: int @dataclass(frozen=True) class _Candidate: node_id: str zone_id: str reason: str depth: int definition_order: int def resolve_zones( nodes: Iterable[Mapping[str, Any]], edges: Iterable[Mapping[str, Any]], zone_definitions: Iterable[ZoneDefinition | Mapping[str, Any]], ) -> ZoneResolution: """Resolve zone definitions against graph data without mutating the graph.""" node_records = _node_records(nodes) edge_records = _edge_records(edges) definitions = tuple( definition if isinstance(definition, ZoneDefinition) else ZoneDefinition.from_dict(definition) for definition in zone_definitions ) enabled_definitions = tuple( (index, definition) for index, definition in enumerate(definitions) if definition.enabled ) diagnostics: list[ZoneDiagnostic] = [] candidates_by_node_id: dict[str, list[_Candidate]] = defaultdict(list) seed_node_ids_by_zone_id: dict[str, set[str]] = defaultdict(set) for definition_order, definition in enabled_definitions: for node in node_records: if _rule_matches(node.data, definition.membership, empty_matches=False): seed_node_ids_by_zone_id[definition.id].add(node.id) candidates_by_node_id[node.id].append( _Candidate(node.id, definition.id, "seed", 0, definition_order) ) if not seed_node_ids_by_zone_id[definition.id]: diagnostics.append( ZoneDiagnostic( severity="WARN", code="ZONE_EMPTY_SEED_SET", message=f"Zone {definition.id} did not match any seed nodes.", zone_ids=(definition.id,), ) ) attraction_candidates = _attraction_candidates( node_records, edge_records, enabled_definitions, seed_node_ids_by_zone_id, diagnostics, ) for candidate in attraction_candidates: candidates_by_node_id[candidate.node_id].append(candidate) height_by_zone_id = { definition.id: definition.presentation.height for _, definition in enabled_definitions } assignments: dict[str, ZoneNodeAssignment] = {} for node in node_records: candidates = candidates_by_node_id.get(node.id, []) if not candidates: continue seed_candidates = _unique_zone_candidates( candidate for candidate in candidates if candidate.reason == "seed" ) attraction_candidates_for_node = _unique_zone_candidates( candidate for candidate in candidates if candidate.reason == "attraction" ) if len(seed_candidates) > 1: diagnostics.append( ZoneDiagnostic( severity="WARN", code="ZONE_NODE_SEEDED_BY_MULTIPLE_ZONES", message=f"Node {node.id} matched multiple zone membership rules.", node_id=node.id, zone_ids=tuple(candidate.zone_id for candidate in seed_candidates), ) ) if len(attraction_candidates_for_node) > 1: diagnostics.append( ZoneDiagnostic( severity="WARN", code="ZONE_NODE_ATTRACTED_BY_MULTIPLE_ZONES", message=f"Node {node.id} was attracted by multiple zones.", node_id=node.id, zone_ids=tuple(candidate.zone_id for candidate in attraction_candidates_for_node), ) ) if seed_candidates and any( candidate.zone_id not in {seed.zone_id for seed in seed_candidates} for candidate in attraction_candidates_for_node ): diagnostics.append( ZoneDiagnostic( severity="INFO", code="ZONE_SEED_OVERRIDES_ATTRACTION", message=f"Node {node.id} kept seed membership over attraction.", node_id=node.id, zone_ids=tuple( candidate.zone_id for candidate in [*seed_candidates, *attraction_candidates_for_node] ), ) ) pool = seed_candidates or attraction_candidates_for_node chosen = _choose_candidate(pool, height_by_zone_id) assignments[node.id] = ZoneNodeAssignment( node_id=node.id, zone_id=chosen.zone_id, reason=chosen.reason, depth=chosen.depth, ) boundary_edges: list[ZoneBoundaryEdge] = [] internal_edge_ids_by_zone_id: dict[str, list[str]] = defaultdict(list) boundary_edge_ids_by_zone_id: dict[str, list[str]] = defaultdict(list) for edge in edge_records: if _is_context_only_edge(edge): continue source_zone_id = assignments.get(edge.source).zone_id if edge.source in assignments else None target_zone_id = assignments.get(edge.target).zone_id if edge.target in assignments else None if source_zone_id and source_zone_id == target_zone_id: internal_edge_ids_by_zone_id[source_zone_id].append(edge.id) continue if source_zone_id or target_zone_id: zone_ids = tuple(sorted(str(zone_id) for zone_id in {source_zone_id, target_zone_id} - {None})) diagnostics.append( ZoneDiagnostic( severity="INFO", code="ZONE_EDGE_CROSSES_ZONE_BOUNDARY", message=f"Edge {edge.id} crosses a zone boundary.", edge_id=edge.id, zone_ids=zone_ids, ) ) boundary_edges.append( ZoneBoundaryEdge( edge_id=edge.id, source=edge.source, target=edge.target, edge_type=edge.edge_type, source_zone_id=source_zone_id, target_zone_id=target_zone_id, ) ) for zone_id in {source_zone_id, target_zone_id} - {None}: boundary_edge_ids_by_zone_id[str(zone_id)].append(edge.id) zones: list[ZoneInstance] = [] for _, definition in enabled_definitions: node_ids = tuple( node.id for node in node_records if node.id in assignments and assignments[node.id].zone_id == definition.id ) seed_node_ids = tuple( node_id for node_id in node_ids if assignments[node_id].reason == "seed" ) attracted_node_ids = tuple( node_id for node_id in node_ids if assignments[node_id].reason == "attraction" ) zone_diagnostics = tuple( diagnostic for diagnostic in diagnostics if definition.id in diagnostic.zone_ids ) zones.append( ZoneInstance( id=definition.id, label=definition.label or definition.id, definition=definition, node_ids=node_ids, seed_node_ids=seed_node_ids, attracted_node_ids=attracted_node_ids, internal_edge_ids=tuple(internal_edge_ids_by_zone_id[definition.id]), boundary_edge_ids=tuple(boundary_edge_ids_by_zone_id[definition.id]), diagnostics=zone_diagnostics, ) ) return ZoneResolution( zones=tuple(zones), node_assignments=assignments, boundary_edges=tuple(boundary_edges), diagnostics=tuple(diagnostics), ) def _attraction_candidates( node_records: tuple[_NodeRecord, ...], edge_records: tuple[_EdgeRecord, ...], enabled_definitions: tuple[tuple[int, ZoneDefinition], ...], seed_node_ids_by_zone_id: Mapping[str, set[str]], diagnostics: list[ZoneDiagnostic], ) -> list[_Candidate]: nodes_by_id = {node.id: node for node in node_records} adjacency: dict[str, list[_EdgeRecord]] = defaultdict(list) for edge in edge_records: if _is_context_only_edge(edge): continue adjacency[edge.source].append(edge) adjacency[edge.target].append(edge) candidates: dict[tuple[str, str], _Candidate] = {} depth_limit_diagnostics: set[tuple[str, str]] = set() for definition_order, definition in enabled_definitions: seed_node_ids = seed_node_ids_by_zone_id.get(definition.id, set()) if not seed_node_ids: continue for rule in definition.attraction_rules: queue: deque[tuple[str, int]] = deque((node_id, 0) for node_id in seed_node_ids) seen_depth_by_node_id = {node_id: 0 for node_id in seed_node_ids} while queue: node_id, depth = queue.popleft() if depth >= rule.depth: if _has_matching_attraction_neighbor( node_id, adjacency.get(node_id, []), nodes_by_id, rule, ): key = (definition.id, node_id) if key not in depth_limit_diagnostics: depth_limit_diagnostics.add(key) diagnostics.append( ZoneDiagnostic( severity="INFO", code="ZONE_ATTRACTION_DEPTH_LIMIT_REACHED", message=( f"Zone {definition.id} reached attraction depth " f"{rule.depth} at node {node_id}." ), node_id=node_id, zone_ids=(definition.id,), ) ) continue for edge in adjacency.get(node_id, []): if not _edge_matches_attraction_rule(edge, rule): continue neighbor_id = _neighbor_for_direction(node_id, edge, rule.direction) if not neighbor_id: continue next_depth = depth + 1 if seen_depth_by_node_id.get(neighbor_id, next_depth + 1) <= next_depth: continue neighbor = nodes_by_id.get(neighbor_id) if not neighbor or not _rule_matches( neighbor.data, rule.node_filter, empty_matches=True, ): continue seen_depth_by_node_id[neighbor_id] = next_depth queue.append((neighbor_id, next_depth)) if neighbor_id in seed_node_ids: continue key = (neighbor_id, definition.id) existing = candidates.get(key) candidate = _Candidate( node_id=neighbor_id, zone_id=definition.id, reason="attraction", depth=next_depth, definition_order=definition_order, ) if existing is None or candidate.depth < existing.depth: candidates[key] = candidate return sorted(candidates.values(), key=lambda candidate: (candidate.node_id, candidate.zone_id)) def _has_matching_attraction_neighbor( node_id: str, edges: Iterable[_EdgeRecord], nodes_by_id: Mapping[str, _NodeRecord], rule: ZoneAttractionRule, ) -> bool: for edge in edges: if not _edge_matches_attraction_rule(edge, rule): continue neighbor_id = _neighbor_for_direction(node_id, edge, rule.direction) if not neighbor_id: continue neighbor = nodes_by_id.get(neighbor_id) if neighbor and _rule_matches(neighbor.data, rule.node_filter, empty_matches=True): return True return False def _node_records(nodes: Iterable[Mapping[str, Any]]) -> tuple[_NodeRecord, ...]: records: list[_NodeRecord] = [] for order, element in enumerate(nodes): data = _element_data(element) node_id = str(data.get("id") or data.get("stableKey") or "") if node_id: records.append(_NodeRecord(id=node_id, data=data, order=order)) return tuple(records) def _edge_records(edges: Iterable[Mapping[str, Any]]) -> tuple[_EdgeRecord, ...]: records: list[_EdgeRecord] = [] for order, element in enumerate(edges): data = _element_data(element) source = str(data.get("source") or "") target = str(data.get("target") or "") if not source or not target: continue edge_type = str(data.get("edgeType") or data.get("type") or data.get("canonicalType") or "") edge_id = str(data.get("id") or data.get("stableKey") or f"edge:{source}:{target}:{order}") records.append( _EdgeRecord( id=edge_id, source=source, target=target, edge_type=edge_type, data=data, order=order, ) ) return tuple(records) def _element_data(element: Mapping[str, Any]) -> Mapping[str, Any]: data = element.get("data") if isinstance(data, Mapping): return data return element def _edge_matches_attraction_rule(edge: _EdgeRecord, rule: ZoneAttractionRule) -> bool: if rule.edge_type and rule.edge_type != "*" and edge.edge_type != rule.edge_type: return False return _rule_matches(edge.data, rule.edge_filter, empty_matches=True) def _is_context_only_edge(edge: _EdgeRecord) -> bool: return _trueish(edge.data.get("displayOnly", edge.data.get("display_only"))) or edge.edge_type == "declares" def _trueish(value: Any) -> bool: return value is True or str(value).lower() == "true" def _neighbor_for_direction(node_id: str, edge: _EdgeRecord, direction: str) -> str: if direction in {"out", "both"} and edge.source == node_id: return edge.target if direction in {"in", "both"} and edge.target == node_id: return edge.source return "" def _rule_matches(data: Mapping[str, Any], rule: Mapping[str, Any], *, empty_matches: bool) -> bool: if not rule: return empty_matches if "all" in rule: nested = rule.get("all") return isinstance(nested, Iterable) and all( isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False) for child in nested ) if "any" in rule: nested = rule.get("any") return isinstance(nested, Iterable) and any( isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False) for child in nested ) if "rules" in rule: nested = rule.get("rules") return isinstance(nested, Iterable) and all( isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False) for child in nested ) field = str(rule.get("field") or "") op = str(rule.get("op") or "equals") expected = rule.get("value") actual = _field_value(data, field) if op == "equals": return actual == expected if op == "in": if isinstance(expected, str) or not isinstance(expected, Iterable): return actual == expected return actual in expected if op == "exists": return actual is not None and actual != "" and actual != [] raise ValueError(f"Unsupported zone rule operator: {op}") def _field_value(data: Mapping[str, Any], field: str) -> Any: if not field: return None current: Any = data for part in field.split("."): if not isinstance(current, Mapping) or part not in current: return None current = current[part] return current def _unique_zone_candidates(candidates: Iterable[_Candidate]) -> list[_Candidate]: by_zone_id: dict[str, _Candidate] = {} for candidate in candidates: existing = by_zone_id.get(candidate.zone_id) if existing is None or (candidate.depth, candidate.definition_order) < ( existing.depth, existing.definition_order, ): by_zone_id[candidate.zone_id] = candidate return list(by_zone_id.values()) def _choose_candidate(candidates: Iterable[_Candidate], height_by_zone_id: Mapping[str, int]) -> _Candidate: return sorted( candidates, key=lambda candidate: ( -height_by_zone_id.get(candidate.zone_id, 0), candidate.definition_order, candidate.zone_id, candidate.depth, ), )[0] def _mapping_or_empty(value: Any) -> Mapping[str, Any]: return value if isinstance(value, Mapping) else {} def _mapping_or_none(value: Any) -> Mapping[str, Any] | None: return value if isinstance(value, Mapping) else None def _optional_string(value: Any) -> str | None: if value is None or value == "": return None return str(value)