generated from coulomb/repo-seed
662 lines
23 KiB
Python
662 lines
23 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import defaultdict, deque
|
|
from collections.abc import Iterable, Mapping
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ZoneAttractionRule:
|
|
edge_type: str | None = None
|
|
direction: str = "both"
|
|
depth: int = 1
|
|
node_filter: Mapping[str, Any] = field(default_factory=dict)
|
|
edge_filter: Mapping[str, Any] = field(default_factory=dict)
|
|
|
|
def __post_init__(self) -> None:
|
|
if self.direction not in {"out", "in", "both"}:
|
|
raise ValueError(f"Unsupported attraction direction: {self.direction}")
|
|
if self.depth < 0:
|
|
raise ValueError("Attraction depth must be zero or greater")
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Mapping[str, Any]) -> ZoneAttractionRule:
|
|
return cls(
|
|
edge_type=_optional_string(data.get("edge_type", data.get("edgeType"))),
|
|
direction=str(data.get("direction") or "both"),
|
|
depth=int(data.get("depth", 1)),
|
|
node_filter=_mapping_or_empty(data.get("node_filter", data.get("nodeFilter"))),
|
|
edge_filter=_mapping_or_empty(data.get("edge_filter", data.get("edgeFilter"))),
|
|
)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
result: dict[str, Any] = {
|
|
"direction": self.direction,
|
|
"depth": self.depth,
|
|
}
|
|
if self.edge_type:
|
|
result["edge_type"] = self.edge_type
|
|
if self.node_filter:
|
|
result["node_filter"] = dict(self.node_filter)
|
|
if self.edge_filter:
|
|
result["edge_filter"] = dict(self.edge_filter)
|
|
return result
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ZoneLayout:
|
|
algorithm: str = "preset"
|
|
options: Mapping[str, Any] = field(default_factory=dict)
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Mapping[str, Any] | None) -> ZoneLayout:
|
|
if not data:
|
|
return cls()
|
|
return cls(
|
|
algorithm=str(data.get("algorithm") or "preset"),
|
|
options=_mapping_or_empty(data.get("options")),
|
|
)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {"algorithm": self.algorithm, "options": dict(self.options)}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ZonePresentation:
|
|
height: int = 0
|
|
color: str = ""
|
|
opacity: float = 0.16
|
|
blur_below: bool = False
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Mapping[str, Any] | None) -> ZonePresentation:
|
|
if not data:
|
|
return cls()
|
|
return cls(
|
|
height=int(data.get("height", 0)),
|
|
color=str(data.get("color") or ""),
|
|
opacity=float(data.get("opacity", 0.16)),
|
|
blur_below=bool(data.get("blur_below", data.get("blurBelow", False))),
|
|
)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"height": self.height,
|
|
"color": self.color,
|
|
"opacity": self.opacity,
|
|
"blur_below": self.blur_below,
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ZoneCollapse:
|
|
enabled: bool = False
|
|
label: str = ""
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Mapping[str, Any] | None) -> ZoneCollapse:
|
|
if not data:
|
|
return cls()
|
|
return cls(
|
|
enabled=bool(data.get("enabled", False)),
|
|
label=str(data.get("label") or ""),
|
|
)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {"enabled": self.enabled, "label": self.label}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ZoneDefinition:
|
|
id: str
|
|
label: str = ""
|
|
enabled: bool = True
|
|
membership: Mapping[str, Any] = field(default_factory=dict)
|
|
attraction_rules: tuple[ZoneAttractionRule, ...] = ()
|
|
layout: ZoneLayout = field(default_factory=ZoneLayout)
|
|
presentation: ZonePresentation = field(default_factory=ZonePresentation)
|
|
collapse: ZoneCollapse = field(default_factory=ZoneCollapse)
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Mapping[str, Any]) -> ZoneDefinition:
|
|
attraction = data.get("attraction", ())
|
|
if isinstance(attraction, Mapping):
|
|
raw_attraction_rules = attraction.get("rules", ())
|
|
else:
|
|
raw_attraction_rules = attraction
|
|
rules = tuple(
|
|
rule if isinstance(rule, ZoneAttractionRule) else ZoneAttractionRule.from_dict(rule)
|
|
for rule in raw_attraction_rules
|
|
if isinstance(rule, Mapping) or isinstance(rule, ZoneAttractionRule)
|
|
)
|
|
zone_id = str(data["id"])
|
|
return cls(
|
|
id=zone_id,
|
|
label=str(data.get("label") or zone_id),
|
|
enabled=bool(data.get("enabled", True)),
|
|
membership=_mapping_or_empty(data.get("membership")),
|
|
attraction_rules=rules,
|
|
layout=ZoneLayout.from_dict(_mapping_or_none(data.get("layout"))),
|
|
presentation=ZonePresentation.from_dict(_mapping_or_none(data.get("presentation"))),
|
|
collapse=ZoneCollapse.from_dict(_mapping_or_none(data.get("collapse"))),
|
|
)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"id": self.id,
|
|
"label": self.label or self.id,
|
|
"enabled": self.enabled,
|
|
"membership": dict(self.membership),
|
|
"attraction": {"rules": [rule.to_dict() for rule in self.attraction_rules]},
|
|
"layout": self.layout.to_dict(),
|
|
"presentation": self.presentation.to_dict(),
|
|
"collapse": self.collapse.to_dict(),
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ZoneNodeAssignment:
|
|
node_id: str
|
|
zone_id: str
|
|
reason: str
|
|
depth: int = 0
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"node_id": self.node_id,
|
|
"zone_id": self.zone_id,
|
|
"reason": self.reason,
|
|
"depth": self.depth,
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ZoneBoundaryEdge:
|
|
edge_id: str
|
|
source: str
|
|
target: str
|
|
edge_type: str
|
|
source_zone_id: str | None
|
|
target_zone_id: str | None
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"edge_id": self.edge_id,
|
|
"source": self.source,
|
|
"target": self.target,
|
|
"edge_type": self.edge_type,
|
|
"source_zone_id": self.source_zone_id,
|
|
"target_zone_id": self.target_zone_id,
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ZoneDiagnostic:
|
|
severity: str
|
|
code: str
|
|
message: str
|
|
node_id: str | None = None
|
|
edge_id: str | None = None
|
|
zone_ids: tuple[str, ...] = ()
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
result: dict[str, Any] = {
|
|
"severity": self.severity,
|
|
"code": self.code,
|
|
"message": self.message,
|
|
"zone_ids": list(self.zone_ids),
|
|
}
|
|
if self.node_id:
|
|
result["node_id"] = self.node_id
|
|
if self.edge_id:
|
|
result["edge_id"] = self.edge_id
|
|
return result
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ZoneInstance:
|
|
id: str
|
|
label: str
|
|
definition: ZoneDefinition
|
|
node_ids: tuple[str, ...]
|
|
seed_node_ids: tuple[str, ...]
|
|
attracted_node_ids: tuple[str, ...]
|
|
internal_edge_ids: tuple[str, ...]
|
|
boundary_edge_ids: tuple[str, ...]
|
|
diagnostics: tuple[ZoneDiagnostic, ...]
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"id": self.id,
|
|
"label": self.label,
|
|
"definition": self.definition.to_dict(),
|
|
"node_ids": list(self.node_ids),
|
|
"seed_node_ids": list(self.seed_node_ids),
|
|
"attracted_node_ids": list(self.attracted_node_ids),
|
|
"internal_edge_ids": list(self.internal_edge_ids),
|
|
"boundary_edge_ids": list(self.boundary_edge_ids),
|
|
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ZoneResolution:
|
|
zones: tuple[ZoneInstance, ...]
|
|
node_assignments: Mapping[str, ZoneNodeAssignment]
|
|
boundary_edges: tuple[ZoneBoundaryEdge, ...]
|
|
diagnostics: tuple[ZoneDiagnostic, ...]
|
|
|
|
def zone_by_id(self, zone_id: str) -> ZoneInstance | None:
|
|
return next((zone for zone in self.zones if zone.id == zone_id), None)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"zones": [zone.to_dict() for zone in self.zones],
|
|
"node_assignments": {
|
|
node_id: assignment.to_dict()
|
|
for node_id, assignment in self.node_assignments.items()
|
|
},
|
|
"boundary_edges": [edge.to_dict() for edge in self.boundary_edges],
|
|
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _NodeRecord:
|
|
id: str
|
|
data: Mapping[str, Any]
|
|
order: int
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _EdgeRecord:
|
|
id: str
|
|
source: str
|
|
target: str
|
|
edge_type: str
|
|
data: Mapping[str, Any]
|
|
order: int
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _Candidate:
|
|
node_id: str
|
|
zone_id: str
|
|
reason: str
|
|
depth: int
|
|
definition_order: int
|
|
|
|
|
|
def resolve_zones(
|
|
nodes: Iterable[Mapping[str, Any]],
|
|
edges: Iterable[Mapping[str, Any]],
|
|
zone_definitions: Iterable[ZoneDefinition | Mapping[str, Any]],
|
|
) -> ZoneResolution:
|
|
"""Resolve zone definitions against graph data without mutating the graph."""
|
|
|
|
node_records = _node_records(nodes)
|
|
edge_records = _edge_records(edges)
|
|
definitions = tuple(
|
|
definition if isinstance(definition, ZoneDefinition) else ZoneDefinition.from_dict(definition)
|
|
for definition in zone_definitions
|
|
)
|
|
enabled_definitions = tuple(
|
|
(index, definition)
|
|
for index, definition in enumerate(definitions)
|
|
if definition.enabled
|
|
)
|
|
diagnostics: list[ZoneDiagnostic] = []
|
|
candidates_by_node_id: dict[str, list[_Candidate]] = defaultdict(list)
|
|
seed_node_ids_by_zone_id: dict[str, set[str]] = defaultdict(set)
|
|
|
|
for definition_order, definition in enabled_definitions:
|
|
for node in node_records:
|
|
if _rule_matches(node.data, definition.membership, empty_matches=False):
|
|
seed_node_ids_by_zone_id[definition.id].add(node.id)
|
|
candidates_by_node_id[node.id].append(
|
|
_Candidate(node.id, definition.id, "seed", 0, definition_order)
|
|
)
|
|
if not seed_node_ids_by_zone_id[definition.id]:
|
|
diagnostics.append(
|
|
ZoneDiagnostic(
|
|
severity="WARN",
|
|
code="ZONE_EMPTY_SEED_SET",
|
|
message=f"Zone {definition.id} did not match any seed nodes.",
|
|
zone_ids=(definition.id,),
|
|
)
|
|
)
|
|
|
|
attraction_candidates = _attraction_candidates(
|
|
node_records,
|
|
edge_records,
|
|
enabled_definitions,
|
|
seed_node_ids_by_zone_id,
|
|
)
|
|
for candidate in attraction_candidates:
|
|
candidates_by_node_id[candidate.node_id].append(candidate)
|
|
|
|
height_by_zone_id = {
|
|
definition.id: definition.presentation.height
|
|
for _, definition in enabled_definitions
|
|
}
|
|
assignments: dict[str, ZoneNodeAssignment] = {}
|
|
for node in node_records:
|
|
candidates = candidates_by_node_id.get(node.id, [])
|
|
if not candidates:
|
|
continue
|
|
seed_candidates = _unique_zone_candidates(
|
|
candidate for candidate in candidates if candidate.reason == "seed"
|
|
)
|
|
attraction_candidates_for_node = _unique_zone_candidates(
|
|
candidate for candidate in candidates if candidate.reason == "attraction"
|
|
)
|
|
if len(seed_candidates) > 1:
|
|
diagnostics.append(
|
|
ZoneDiagnostic(
|
|
severity="WARN",
|
|
code="ZONE_NODE_SEEDED_BY_MULTIPLE_ZONES",
|
|
message=f"Node {node.id} matched multiple zone membership rules.",
|
|
node_id=node.id,
|
|
zone_ids=tuple(candidate.zone_id for candidate in seed_candidates),
|
|
)
|
|
)
|
|
if len(attraction_candidates_for_node) > 1:
|
|
diagnostics.append(
|
|
ZoneDiagnostic(
|
|
severity="WARN",
|
|
code="ZONE_NODE_ATTRACTED_BY_MULTIPLE_ZONES",
|
|
message=f"Node {node.id} was attracted by multiple zones.",
|
|
node_id=node.id,
|
|
zone_ids=tuple(candidate.zone_id for candidate in attraction_candidates_for_node),
|
|
)
|
|
)
|
|
if seed_candidates and any(
|
|
candidate.zone_id not in {seed.zone_id for seed in seed_candidates}
|
|
for candidate in attraction_candidates_for_node
|
|
):
|
|
diagnostics.append(
|
|
ZoneDiagnostic(
|
|
severity="INFO",
|
|
code="ZONE_SEED_OVERRIDES_ATTRACTION",
|
|
message=f"Node {node.id} kept seed membership over attraction.",
|
|
node_id=node.id,
|
|
zone_ids=tuple(
|
|
candidate.zone_id
|
|
for candidate in [*seed_candidates, *attraction_candidates_for_node]
|
|
),
|
|
)
|
|
)
|
|
pool = seed_candidates or attraction_candidates_for_node
|
|
chosen = _choose_candidate(pool, height_by_zone_id)
|
|
assignments[node.id] = ZoneNodeAssignment(
|
|
node_id=node.id,
|
|
zone_id=chosen.zone_id,
|
|
reason=chosen.reason,
|
|
depth=chosen.depth,
|
|
)
|
|
|
|
boundary_edges: list[ZoneBoundaryEdge] = []
|
|
internal_edge_ids_by_zone_id: dict[str, list[str]] = defaultdict(list)
|
|
boundary_edge_ids_by_zone_id: dict[str, list[str]] = defaultdict(list)
|
|
for edge in edge_records:
|
|
source_zone_id = assignments.get(edge.source).zone_id if edge.source in assignments else None
|
|
target_zone_id = assignments.get(edge.target).zone_id if edge.target in assignments else None
|
|
if source_zone_id and source_zone_id == target_zone_id:
|
|
internal_edge_ids_by_zone_id[source_zone_id].append(edge.id)
|
|
continue
|
|
if source_zone_id or target_zone_id:
|
|
boundary_edges.append(
|
|
ZoneBoundaryEdge(
|
|
edge_id=edge.id,
|
|
source=edge.source,
|
|
target=edge.target,
|
|
edge_type=edge.edge_type,
|
|
source_zone_id=source_zone_id,
|
|
target_zone_id=target_zone_id,
|
|
)
|
|
)
|
|
for zone_id in {source_zone_id, target_zone_id} - {None}:
|
|
boundary_edge_ids_by_zone_id[str(zone_id)].append(edge.id)
|
|
|
|
zones: list[ZoneInstance] = []
|
|
for _, definition in enabled_definitions:
|
|
node_ids = tuple(
|
|
node.id
|
|
for node in node_records
|
|
if node.id in assignments and assignments[node.id].zone_id == definition.id
|
|
)
|
|
seed_node_ids = tuple(
|
|
node_id
|
|
for node_id in node_ids
|
|
if assignments[node_id].reason == "seed"
|
|
)
|
|
attracted_node_ids = tuple(
|
|
node_id
|
|
for node_id in node_ids
|
|
if assignments[node_id].reason == "attraction"
|
|
)
|
|
zone_diagnostics = tuple(
|
|
diagnostic
|
|
for diagnostic in diagnostics
|
|
if definition.id in diagnostic.zone_ids
|
|
)
|
|
zones.append(
|
|
ZoneInstance(
|
|
id=definition.id,
|
|
label=definition.label or definition.id,
|
|
definition=definition,
|
|
node_ids=node_ids,
|
|
seed_node_ids=seed_node_ids,
|
|
attracted_node_ids=attracted_node_ids,
|
|
internal_edge_ids=tuple(internal_edge_ids_by_zone_id[definition.id]),
|
|
boundary_edge_ids=tuple(boundary_edge_ids_by_zone_id[definition.id]),
|
|
diagnostics=zone_diagnostics,
|
|
)
|
|
)
|
|
|
|
return ZoneResolution(
|
|
zones=tuple(zones),
|
|
node_assignments=assignments,
|
|
boundary_edges=tuple(boundary_edges),
|
|
diagnostics=tuple(diagnostics),
|
|
)
|
|
|
|
|
|
def _attraction_candidates(
|
|
node_records: tuple[_NodeRecord, ...],
|
|
edge_records: tuple[_EdgeRecord, ...],
|
|
enabled_definitions: tuple[tuple[int, ZoneDefinition], ...],
|
|
seed_node_ids_by_zone_id: Mapping[str, set[str]],
|
|
) -> list[_Candidate]:
|
|
nodes_by_id = {node.id: node for node in node_records}
|
|
adjacency: dict[str, list[_EdgeRecord]] = defaultdict(list)
|
|
for edge in edge_records:
|
|
adjacency[edge.source].append(edge)
|
|
adjacency[edge.target].append(edge)
|
|
|
|
candidates: dict[tuple[str, str], _Candidate] = {}
|
|
for definition_order, definition in enabled_definitions:
|
|
seed_node_ids = seed_node_ids_by_zone_id.get(definition.id, set())
|
|
if not seed_node_ids:
|
|
continue
|
|
for rule in definition.attraction_rules:
|
|
queue: deque[tuple[str, int]] = deque((node_id, 0) for node_id in seed_node_ids)
|
|
seen_depth_by_node_id = {node_id: 0 for node_id in seed_node_ids}
|
|
while queue:
|
|
node_id, depth = queue.popleft()
|
|
if depth >= rule.depth:
|
|
continue
|
|
for edge in adjacency.get(node_id, []):
|
|
if not _edge_matches_attraction_rule(edge, rule):
|
|
continue
|
|
neighbor_id = _neighbor_for_direction(node_id, edge, rule.direction)
|
|
if not neighbor_id:
|
|
continue
|
|
next_depth = depth + 1
|
|
if seen_depth_by_node_id.get(neighbor_id, next_depth + 1) <= next_depth:
|
|
continue
|
|
neighbor = nodes_by_id.get(neighbor_id)
|
|
if not neighbor or not _rule_matches(
|
|
neighbor.data,
|
|
rule.node_filter,
|
|
empty_matches=True,
|
|
):
|
|
continue
|
|
seen_depth_by_node_id[neighbor_id] = next_depth
|
|
queue.append((neighbor_id, next_depth))
|
|
if neighbor_id in seed_node_ids:
|
|
continue
|
|
key = (neighbor_id, definition.id)
|
|
existing = candidates.get(key)
|
|
candidate = _Candidate(
|
|
node_id=neighbor_id,
|
|
zone_id=definition.id,
|
|
reason="attraction",
|
|
depth=next_depth,
|
|
definition_order=definition_order,
|
|
)
|
|
if existing is None or candidate.depth < existing.depth:
|
|
candidates[key] = candidate
|
|
return sorted(candidates.values(), key=lambda candidate: (candidate.node_id, candidate.zone_id))
|
|
|
|
|
|
def _node_records(nodes: Iterable[Mapping[str, Any]]) -> tuple[_NodeRecord, ...]:
|
|
records: list[_NodeRecord] = []
|
|
for order, element in enumerate(nodes):
|
|
data = _element_data(element)
|
|
node_id = str(data.get("id") or data.get("stableKey") or "")
|
|
if node_id:
|
|
records.append(_NodeRecord(id=node_id, data=data, order=order))
|
|
return tuple(records)
|
|
|
|
|
|
def _edge_records(edges: Iterable[Mapping[str, Any]]) -> tuple[_EdgeRecord, ...]:
|
|
records: list[_EdgeRecord] = []
|
|
for order, element in enumerate(edges):
|
|
data = _element_data(element)
|
|
source = str(data.get("source") or "")
|
|
target = str(data.get("target") or "")
|
|
if not source or not target:
|
|
continue
|
|
edge_type = str(data.get("edgeType") or data.get("type") or data.get("canonicalType") or "")
|
|
edge_id = str(data.get("id") or data.get("stableKey") or f"edge:{source}:{target}:{order}")
|
|
records.append(
|
|
_EdgeRecord(
|
|
id=edge_id,
|
|
source=source,
|
|
target=target,
|
|
edge_type=edge_type,
|
|
data=data,
|
|
order=order,
|
|
)
|
|
)
|
|
return tuple(records)
|
|
|
|
|
|
def _element_data(element: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
data = element.get("data")
|
|
if isinstance(data, Mapping):
|
|
return data
|
|
return element
|
|
|
|
|
|
def _edge_matches_attraction_rule(edge: _EdgeRecord, rule: ZoneAttractionRule) -> bool:
|
|
if rule.edge_type and rule.edge_type != "*" and edge.edge_type != rule.edge_type:
|
|
return False
|
|
return _rule_matches(edge.data, rule.edge_filter, empty_matches=True)
|
|
|
|
|
|
def _neighbor_for_direction(node_id: str, edge: _EdgeRecord, direction: str) -> str:
|
|
if direction in {"out", "both"} and edge.source == node_id:
|
|
return edge.target
|
|
if direction in {"in", "both"} and edge.target == node_id:
|
|
return edge.source
|
|
return ""
|
|
|
|
|
|
def _rule_matches(data: Mapping[str, Any], rule: Mapping[str, Any], *, empty_matches: bool) -> bool:
|
|
if not rule:
|
|
return empty_matches
|
|
if "all" in rule:
|
|
nested = rule.get("all")
|
|
return isinstance(nested, Iterable) and all(
|
|
isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False)
|
|
for child in nested
|
|
)
|
|
if "any" in rule:
|
|
nested = rule.get("any")
|
|
return isinstance(nested, Iterable) and any(
|
|
isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False)
|
|
for child in nested
|
|
)
|
|
if "rules" in rule:
|
|
nested = rule.get("rules")
|
|
return isinstance(nested, Iterable) and all(
|
|
isinstance(child, Mapping) and _rule_matches(data, child, empty_matches=False)
|
|
for child in nested
|
|
)
|
|
|
|
field = str(rule.get("field") or "")
|
|
op = str(rule.get("op") or "equals")
|
|
expected = rule.get("value")
|
|
actual = _field_value(data, field)
|
|
if op == "equals":
|
|
return actual == expected
|
|
if op == "in":
|
|
if isinstance(expected, str) or not isinstance(expected, Iterable):
|
|
return actual == expected
|
|
return actual in expected
|
|
if op == "exists":
|
|
return actual is not None and actual != "" and actual != []
|
|
raise ValueError(f"Unsupported zone rule operator: {op}")
|
|
|
|
|
|
def _field_value(data: Mapping[str, Any], field: str) -> Any:
|
|
if not field:
|
|
return None
|
|
current: Any = data
|
|
for part in field.split("."):
|
|
if not isinstance(current, Mapping) or part not in current:
|
|
return None
|
|
current = current[part]
|
|
return current
|
|
|
|
|
|
def _unique_zone_candidates(candidates: Iterable[_Candidate]) -> list[_Candidate]:
|
|
by_zone_id: dict[str, _Candidate] = {}
|
|
for candidate in candidates:
|
|
existing = by_zone_id.get(candidate.zone_id)
|
|
if existing is None or (candidate.depth, candidate.definition_order) < (
|
|
existing.depth,
|
|
existing.definition_order,
|
|
):
|
|
by_zone_id[candidate.zone_id] = candidate
|
|
return list(by_zone_id.values())
|
|
|
|
|
|
def _choose_candidate(candidates: Iterable[_Candidate], height_by_zone_id: Mapping[str, int]) -> _Candidate:
|
|
return sorted(
|
|
candidates,
|
|
key=lambda candidate: (
|
|
-height_by_zone_id.get(candidate.zone_id, 0),
|
|
candidate.definition_order,
|
|
candidate.zone_id,
|
|
candidate.depth,
|
|
),
|
|
)[0]
|
|
|
|
|
|
def _mapping_or_empty(value: Any) -> Mapping[str, Any]:
|
|
return value if isinstance(value, Mapping) else {}
|
|
|
|
|
|
def _mapping_or_none(value: Any) -> Mapping[str, Any] | None:
|
|
return value if isinstance(value, Mapping) else None
|
|
|
|
|
|
def _optional_string(value: Any) -> str | None:
|
|
if value is None or value == "":
|
|
return None
|
|
return str(value)
|