Files

726 lines
26 KiB
Python

from __future__ import annotations
from collections import defaultdict, deque
from collections.abc import Iterable, Mapping
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
class ZoneAttractionRule:
edge_type: str | None = None
direction: str = "both"
depth: int = 1
node_filter: Mapping[str, Any] = field(default_factory=dict)
edge_filter: Mapping[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if self.direction not in {"out", "in", "both"}:
raise ValueError(f"Unsupported attraction direction: {self.direction}")
if self.depth < 0:
raise ValueError("Attraction depth must be zero or greater")
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> ZoneAttractionRule:
return cls(
edge_type=_optional_string(data.get("edge_type", data.get("edgeType"))),
direction=str(data.get("direction") or "both"),
depth=int(data.get("depth", 1)),
node_filter=_mapping_or_empty(data.get("node_filter", data.get("nodeFilter"))),
edge_filter=_mapping_or_empty(data.get("edge_filter", data.get("edgeFilter"))),
)
def to_dict(self) -> dict[str, Any]:
result: dict[str, Any] = {
"direction": self.direction,
"depth": self.depth,
}
if self.edge_type:
result["edge_type"] = self.edge_type
if self.node_filter:
result["node_filter"] = dict(self.node_filter)
if self.edge_filter:
result["edge_filter"] = dict(self.edge_filter)
return result
@dataclass(frozen=True)
class ZoneLayout:
algorithm: str = "preset"
options: Mapping[str, Any] = field(default_factory=dict)
@classmethod
def from_dict(cls, data: Mapping[str, Any] | None) -> ZoneLayout:
if not data:
return cls()
return cls(
algorithm=str(data.get("algorithm") or "preset"),
options=_mapping_or_empty(data.get("options")),
)
def to_dict(self) -> dict[str, Any]:
return {"algorithm": self.algorithm, "options": dict(self.options)}
@dataclass(frozen=True)
class ZonePresentation:
height: int = 0
color: str = ""
opacity: float = 0.16
blur_below: bool = False
@classmethod
def from_dict(cls, data: Mapping[str, Any] | None) -> ZonePresentation:
if not data:
return cls()
return cls(
height=int(data.get("height", 0)),
color=str(data.get("color") or ""),
opacity=float(data.get("opacity", 0.16)),
blur_below=bool(data.get("blur_below", data.get("blurBelow", False))),
)
def to_dict(self) -> dict[str, Any]:
return {
"height": self.height,
"color": self.color,
"opacity": self.opacity,
"blur_below": self.blur_below,
}
@dataclass(frozen=True)
class ZoneCollapse:
enabled: bool = False
label: str = ""
@classmethod
def from_dict(cls, data: Mapping[str, Any] | None) -> ZoneCollapse:
if not data:
return cls()
return cls(
enabled=bool(data.get("enabled", False)),
label=str(data.get("label") or ""),
)
def to_dict(self) -> dict[str, Any]:
return {"enabled": self.enabled, "label": self.label}
@dataclass(frozen=True)
class ZoneDefinition:
id: str
label: str = ""
enabled: bool = True
membership: Mapping[str, Any] = field(default_factory=dict)
attraction_rules: tuple[ZoneAttractionRule, ...] = ()
layout: ZoneLayout = field(default_factory=ZoneLayout)
presentation: ZonePresentation = field(default_factory=ZonePresentation)
collapse: ZoneCollapse = field(default_factory=ZoneCollapse)
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> ZoneDefinition:
attraction = data.get("attraction", ())
if isinstance(attraction, Mapping):
raw_attraction_rules = attraction.get("rules", ())
else:
raw_attraction_rules = attraction
rules = tuple(
rule if isinstance(rule, ZoneAttractionRule) else ZoneAttractionRule.from_dict(rule)
for rule in raw_attraction_rules
if isinstance(rule, Mapping) or isinstance(rule, ZoneAttractionRule)
)
zone_id = str(data["id"])
return cls(
id=zone_id,
label=str(data.get("label") or zone_id),
enabled=bool(data.get("enabled", True)),
membership=_mapping_or_empty(data.get("membership")),
attraction_rules=rules,
layout=ZoneLayout.from_dict(_mapping_or_none(data.get("layout"))),
presentation=ZonePresentation.from_dict(_mapping_or_none(data.get("presentation"))),
collapse=ZoneCollapse.from_dict(_mapping_or_none(data.get("collapse"))),
)
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"label": self.label or self.id,
"enabled": self.enabled,
"membership": dict(self.membership),
"attraction": {"rules": [rule.to_dict() for rule in self.attraction_rules]},
"layout": self.layout.to_dict(),
"presentation": self.presentation.to_dict(),
"collapse": self.collapse.to_dict(),
}
@dataclass(frozen=True)
class ZoneNodeAssignment:
node_id: str
zone_id: str
reason: str
depth: int = 0
def to_dict(self) -> dict[str, Any]:
return {
"node_id": self.node_id,
"zone_id": self.zone_id,
"reason": self.reason,
"depth": self.depth,
}
@dataclass(frozen=True)
class ZoneBoundaryEdge:
edge_id: str
source: str
target: str
edge_type: str
source_zone_id: str | None
target_zone_id: str | None
def to_dict(self) -> dict[str, Any]:
return {
"edge_id": self.edge_id,
"source": self.source,
"target": self.target,
"edge_type": self.edge_type,
"source_zone_id": self.source_zone_id,
"target_zone_id": self.target_zone_id,
}
@dataclass(frozen=True)
class ZoneDiagnostic:
severity: str
code: str
message: str
node_id: str | None = None
edge_id: str | None = None
zone_ids: tuple[str, ...] = ()
def to_dict(self) -> dict[str, Any]:
result: dict[str, Any] = {
"severity": self.severity,
"code": self.code,
"message": self.message,
"zone_ids": list(self.zone_ids),
}
if self.node_id:
result["node_id"] = self.node_id
if self.edge_id:
result["edge_id"] = self.edge_id
return result
@dataclass(frozen=True)
class ZoneInstance:
id: str
label: str
definition: ZoneDefinition
node_ids: tuple[str, ...]
seed_node_ids: tuple[str, ...]
attracted_node_ids: tuple[str, ...]
internal_edge_ids: tuple[str, ...]
boundary_edge_ids: tuple[str, ...]
diagnostics: tuple[ZoneDiagnostic, ...]
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"label": self.label,
"definition": self.definition.to_dict(),
"node_ids": list(self.node_ids),
"seed_node_ids": list(self.seed_node_ids),
"attracted_node_ids": list(self.attracted_node_ids),
"internal_edge_ids": list(self.internal_edge_ids),
"boundary_edge_ids": list(self.boundary_edge_ids),
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
}
@dataclass(frozen=True)
class ZoneResolution:
zones: tuple[ZoneInstance, ...]
node_assignments: Mapping[str, ZoneNodeAssignment]
boundary_edges: tuple[ZoneBoundaryEdge, ...]
diagnostics: tuple[ZoneDiagnostic, ...]
def zone_by_id(self, zone_id: str) -> ZoneInstance | None:
return next((zone for zone in self.zones if zone.id == zone_id), None)
def to_dict(self) -> dict[str, Any]:
return {
"zones": [zone.to_dict() for zone in self.zones],
"node_assignments": {
node_id: assignment.to_dict()
for node_id, assignment in self.node_assignments.items()
},
"boundary_edges": [edge.to_dict() for edge in self.boundary_edges],
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
}
@dataclass(frozen=True)
class _NodeRecord:
id: str
data: Mapping[str, Any]
order: int
@dataclass(frozen=True)
class _EdgeRecord:
id: str
source: str
target: str
edge_type: str
data: Mapping[str, Any]
order: int
@dataclass(frozen=True)
class _Candidate:
node_id: str
zone_id: str
reason: str
depth: int
definition_order: int
def resolve_zones(
nodes: Iterable[Mapping[str, Any]],
edges: Iterable[Mapping[str, Any]],
zone_definitions: Iterable[ZoneDefinition | Mapping[str, Any]],
) -> ZoneResolution:
"""Resolve zone definitions against graph data without mutating the graph."""
node_records = _node_records(nodes)
edge_records = _edge_records(edges)
definitions = tuple(
definition if isinstance(definition, ZoneDefinition) else ZoneDefinition.from_dict(definition)
for definition in zone_definitions
)
enabled_definitions = tuple(
(index, definition)
for index, definition in enumerate(definitions)
if definition.enabled
)
diagnostics: list[ZoneDiagnostic] = []
candidates_by_node_id: dict[str, list[_Candidate]] = defaultdict(list)
seed_node_ids_by_zone_id: dict[str, set[str]] = defaultdict(set)
for definition_order, definition in enabled_definitions:
for node in node_records:
if _rule_matches(node.data, definition.membership, empty_matches=False):
seed_node_ids_by_zone_id[definition.id].add(node.id)
candidates_by_node_id[node.id].append(
_Candidate(node.id, definition.id, "seed", 0, definition_order)
)
if not seed_node_ids_by_zone_id[definition.id]:
diagnostics.append(
ZoneDiagnostic(
severity="WARN",
code="ZONE_EMPTY_SEED_SET",
message=f"Zone {definition.id} did not match any seed nodes.",
zone_ids=(definition.id,),
)
)
attraction_candidates = _attraction_candidates(
node_records,
edge_records,
enabled_definitions,
seed_node_ids_by_zone_id,
diagnostics,
)
for candidate in attraction_candidates:
candidates_by_node_id[candidate.node_id].append(candidate)
height_by_zone_id = {
definition.id: definition.presentation.height
for _, definition in enabled_definitions
}
assignments: dict[str, ZoneNodeAssignment] = {}
for node in node_records:
candidates = candidates_by_node_id.get(node.id, [])
if not candidates:
continue
seed_candidates = _unique_zone_candidates(
candidate for candidate in candidates if candidate.reason == "seed"
)
attraction_candidates_for_node = _unique_zone_candidates(
candidate for candidate in candidates if candidate.reason == "attraction"
)
if len(seed_candidates) > 1:
diagnostics.append(
ZoneDiagnostic(
severity="WARN",
code="ZONE_NODE_SEEDED_BY_MULTIPLE_ZONES",
message=f"Node {node.id} matched multiple zone membership rules.",
node_id=node.id,
zone_ids=tuple(candidate.zone_id for candidate in seed_candidates),
)
)
if len(attraction_candidates_for_node) > 1:
diagnostics.append(
ZoneDiagnostic(
severity="WARN",
code="ZONE_NODE_ATTRACTED_BY_MULTIPLE_ZONES",
message=f"Node {node.id} was attracted by multiple zones.",
node_id=node.id,
zone_ids=tuple(candidate.zone_id for candidate in attraction_candidates_for_node),
)
)
if seed_candidates and any(
candidate.zone_id not in {seed.zone_id for seed in seed_candidates}
for candidate in attraction_candidates_for_node
):
diagnostics.append(
ZoneDiagnostic(
severity="INFO",
code="ZONE_SEED_OVERRIDES_ATTRACTION",
message=f"Node {node.id} kept seed membership over attraction.",
node_id=node.id,
zone_ids=tuple(
candidate.zone_id
for candidate in [*seed_candidates, *attraction_candidates_for_node]
),
)
)
pool = seed_candidates or attraction_candidates_for_node
chosen = _choose_candidate(pool, height_by_zone_id)
assignments[node.id] = ZoneNodeAssignment(
node_id=node.id,
zone_id=chosen.zone_id,
reason=chosen.reason,
depth=chosen.depth,
)
boundary_edges: list[ZoneBoundaryEdge] = []
internal_edge_ids_by_zone_id: dict[str, list[str]] = defaultdict(list)
boundary_edge_ids_by_zone_id: dict[str, list[str]] = defaultdict(list)
for edge in edge_records:
if _is_context_only_edge(edge):
continue
source_zone_id = assignments.get(edge.source).zone_id if edge.source in assignments else None
target_zone_id = assignments.get(edge.target).zone_id if edge.target in assignments else None
if source_zone_id and source_zone_id == target_zone_id:
internal_edge_ids_by_zone_id[source_zone_id].append(edge.id)
continue
if source_zone_id or target_zone_id:
zone_ids = tuple(sorted(str(zone_id) for zone_id in {source_zone_id, target_zone_id} - {None}))
diagnostics.append(
ZoneDiagnostic(
severity="INFO",
code="ZONE_EDGE_CROSSES_ZONE_BOUNDARY",
message=f"Edge {edge.id} crosses a zone boundary.",
edge_id=edge.id,
zone_ids=zone_ids,
)
)
boundary_edges.append(
ZoneBoundaryEdge(
edge_id=edge.id,
source=edge.source,
target=edge.target,
edge_type=edge.edge_type,
source_zone_id=source_zone_id,
target_zone_id=target_zone_id,
)
)
for zone_id in {source_zone_id, target_zone_id} - {None}:
boundary_edge_ids_by_zone_id[str(zone_id)].append(edge.id)
zones: list[ZoneInstance] = []
for _, definition in enabled_definitions:
node_ids = tuple(
node.id
for node in node_records
if node.id in assignments and assignments[node.id].zone_id == definition.id
)
seed_node_ids = tuple(
node_id
for node_id in node_ids
if assignments[node_id].reason == "seed"
)
attracted_node_ids = tuple(
node_id
for node_id in node_ids
if assignments[node_id].reason == "attraction"
)
zone_diagnostics = tuple(
diagnostic
for diagnostic in diagnostics
if definition.id in diagnostic.zone_ids
)
zones.append(
ZoneInstance(
id=definition.id,
label=definition.label or definition.id,
definition=definition,
node_ids=node_ids,
seed_node_ids=seed_node_ids,
attracted_node_ids=attracted_node_ids,
internal_edge_ids=tuple(internal_edge_ids_by_zone_id[definition.id]),
boundary_edge_ids=tuple(boundary_edge_ids_by_zone_id[definition.id]),
diagnostics=zone_diagnostics,
)
)
return ZoneResolution(
zones=tuple(zones),
node_assignments=assignments,
boundary_edges=tuple(boundary_edges),
diagnostics=tuple(diagnostics),
)
def _attraction_candidates(
node_records: tuple[_NodeRecord, ...],
edge_records: tuple[_EdgeRecord, ...],
enabled_definitions: tuple[tuple[int, ZoneDefinition], ...],
seed_node_ids_by_zone_id: Mapping[str, set[str]],
diagnostics: list[ZoneDiagnostic],
) -> list[_Candidate]:
nodes_by_id = {node.id: node for node in node_records}
adjacency: dict[str, list[_EdgeRecord]] = defaultdict(list)
for edge in edge_records:
if _is_context_only_edge(edge):
continue
adjacency[edge.source].append(edge)
adjacency[edge.target].append(edge)
candidates: dict[tuple[str, str], _Candidate] = {}
depth_limit_diagnostics: set[tuple[str, str]] = set()
for definition_order, definition in enabled_definitions:
seed_node_ids = seed_node_ids_by_zone_id.get(definition.id, set())
if not seed_node_ids:
continue
for rule in definition.attraction_rules:
queue: deque[tuple[str, int]] = deque((node_id, 0) for node_id in seed_node_ids)
seen_depth_by_node_id = {node_id: 0 for node_id in seed_node_ids}
while queue:
node_id, depth = queue.popleft()
if depth >= rule.depth:
if _has_matching_attraction_neighbor(
node_id,
adjacency.get(node_id, []),
nodes_by_id,
rule,
):
key = (definition.id, node_id)
if key not in depth_limit_diagnostics:
depth_limit_diagnostics.add(key)
diagnostics.append(
ZoneDiagnostic(
severity="INFO",
code="ZONE_ATTRACTION_DEPTH_LIMIT_REACHED",
message=(
f"Zone {definition.id} reached attraction depth "
f"{rule.depth} at node {node_id}."
),
node_id=node_id,
zone_ids=(definition.id,),
)
)
continue
for edge in adjacency.get(node_id, []):
if not _edge_matches_attraction_rule(edge, rule):
continue
neighbor_id = _neighbor_for_direction(node_id, edge, rule.direction)
if not neighbor_id:
continue
next_depth = depth + 1
if seen_depth_by_node_id.get(neighbor_id, next_depth + 1) <= next_depth:
continue
neighbor = nodes_by_id.get(neighbor_id)
if not neighbor or not _rule_matches(
neighbor.data,
rule.node_filter,
empty_matches=True,
):
continue
seen_depth_by_node_id[neighbor_id] = next_depth
queue.append((neighbor_id, next_depth))
if neighbor_id in seed_node_ids:
continue
key = (neighbor_id, definition.id)
existing = candidates.get(key)
candidate = _Candidate(
node_id=neighbor_id,
zone_id=definition.id,
reason="attraction",
depth=next_depth,
definition_order=definition_order,
)
if existing is None or candidate.depth < existing.depth:
candidates[key] = candidate
return sorted(candidates.values(), key=lambda candidate: (candidate.node_id, candidate.zone_id))
def _has_matching_attraction_neighbor(
node_id: str,
edges: Iterable[_EdgeRecord],
nodes_by_id: Mapping[str, _NodeRecord],
rule: ZoneAttractionRule,
) -> bool:
for edge in edges:
if not _edge_matches_attraction_rule(edge, rule):
continue
neighbor_id = _neighbor_for_direction(node_id, edge, rule.direction)
if not neighbor_id:
continue
neighbor = nodes_by_id.get(neighbor_id)
if neighbor and _rule_matches(neighbor.data, rule.node_filter, empty_matches=True):
return True
return False
def _node_records(nodes: Iterable[Mapping[str, Any]]) -> tuple[_NodeRecord, ...]:
records: list[_NodeRecord] = []
for order, element in enumerate(nodes):
data = _element_data(element)
node_id = str(data.get("id") or data.get("stableKey") or "")
if node_id:
records.append(_NodeRecord(id=node_id, data=data, order=order))
return tuple(records)
def _edge_records(edges: Iterable[Mapping[str, Any]]) -> tuple[_EdgeRecord, ...]:
records: list[_EdgeRecord] = []
for order, element in enumerate(edges):
data = _element_data(element)
source = str(data.get("source") or "")
target = str(data.get("target") or "")
if not source or not target:
continue
edge_type = str(data.get("edgeType") or data.get("type") or data.get("canonicalType") or "")
edge_id = str(data.get("id") or data.get("stableKey") or f"edge:{source}:{target}:{order}")
records.append(
_EdgeRecord(
id=edge_id,
source=source,
target=target,
edge_type=edge_type,
data=data,
order=order,
)
)
return tuple(records)
def _element_data(element: Mapping[str, Any]) -> Mapping[str, Any]:
data = element.get("data")
if isinstance(data, Mapping):
return data
return element
def _edge_matches_attraction_rule(edge: _EdgeRecord, rule: ZoneAttractionRule) -> bool:
if rule.edge_type and rule.edge_type != "*" and edge.edge_type != rule.edge_type:
return False
return _rule_matches(edge.data, rule.edge_filter, empty_matches=True)
def _is_context_only_edge(edge: _EdgeRecord) -> bool:
return _trueish(edge.data.get("displayOnly", edge.data.get("display_only"))) or edge.edge_type == "declares"
def _trueish(value: Any) -> bool:
return value is True or str(value).lower() == "true"
def _neighbor_for_direction(node_id: str, edge: _EdgeRecord, direction: str) -> str:
if direction in {"out", "both"} and edge.source == node_id:
return edge.target
if direction in {"in", "both"} and edge.target == node_id:
return edge.source
return ""
def _rule_matches(data: Mapping[str, Any], rule: Mapping[str, Any], *, empty_matches: bool) -> bool:
if not rule:
return empty_matches
if "all" in rule:
nested = rule.get("all")
return isinstance(nested, Iterable) and all(
isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False)
for child in nested
)
if "any" in rule:
nested = rule.get("any")
return isinstance(nested, Iterable) and any(
isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False)
for child in nested
)
if "rules" in rule:
nested = rule.get("rules")
return isinstance(nested, Iterable) and all(
isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False)
for child in nested
)
field = str(rule.get("field") or "")
op = str(rule.get("op") or "equals")
expected = rule.get("value")
actual = _field_value(data, field)
if op == "equals":
return actual == expected
if op == "in":
if isinstance(expected, str) or not isinstance(expected, Iterable):
return actual == expected
return actual in expected
if op == "exists":
return actual is not None and actual != "" and actual != []
raise ValueError(f"Unsupported zone rule operator: {op}")
def _field_value(data: Mapping[str, Any], field: str) -> Any:
if not field:
return None
current: Any = data
for part in field.split("."):
if not isinstance(current, Mapping) or part not in current:
return None
current = current[part]
return current
def _unique_zone_candidates(candidates: Iterable[_Candidate]) -> list[_Candidate]:
by_zone_id: dict[str, _Candidate] = {}
for candidate in candidates:
existing = by_zone_id.get(candidate.zone_id)
if existing is None or (candidate.depth, candidate.definition_order) < (
existing.depth,
existing.definition_order,
):
by_zone_id[candidate.zone_id] = candidate
return list(by_zone_id.values())
def _choose_candidate(candidates: Iterable[_Candidate], height_by_zone_id: Mapping[str, int]) -> _Candidate:
return sorted(
candidates,
key=lambda candidate: (
-height_by_zone_id.get(candidate.zone_id, 0),
candidate.definition_order,
candidate.zone_id,
candidate.depth,
),
)[0]
def _mapping_or_empty(value: Any) -> Mapping[str, Any]:
return value if isinstance(value, Mapping) else {}
def _mapping_or_none(value: Any) -> Mapping[str, Any] | None:
return value if isinstance(value, Mapping) else None
def _optional_string(value: Any) -> str | None:
if value is None or value == "":
return None
return str(value)