generated from coulomb/repo-seed
84 lines
2.8 KiB
Python
84 lines
2.8 KiB
Python
"""Relationship graph helpers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
|
|
from .artifacts import Relationship
|
|
from .errors import ValidationError
|
|
|
|
|
|
@dataclass
|
|
class RelationshipGraph:
|
|
"""In-memory directed graph over artifact relationships."""
|
|
|
|
edges: list[Relationship] = field(default_factory=list)
|
|
|
|
@property
|
|
def nodes(self) -> set[str]:
|
|
nodes: set[str] = set()
|
|
for edge in self.edges:
|
|
nodes.add(edge.source_artifact_id)
|
|
nodes.add(edge.target_artifact_id)
|
|
return nodes
|
|
|
|
def add(self, relationship: Relationship) -> None:
|
|
self.edges.append(relationship)
|
|
|
|
def successors(self, artifact_id: str) -> set[str]:
|
|
return {edge.target_artifact_id for edge in self.edges if edge.source_artifact_id == artifact_id}
|
|
|
|
def predecessors(self, artifact_id: str) -> set[str]:
|
|
return {edge.source_artifact_id for edge in self.edges if edge.target_artifact_id == artifact_id}
|
|
|
|
def detect_cycles(self) -> list[list[str]]:
|
|
color = {node: "white" for node in self.nodes}
|
|
parent: dict[str, str | None] = {node: None for node in self.nodes}
|
|
cycles: list[list[str]] = []
|
|
|
|
def visit(node: str) -> None:
|
|
color[node] = "gray"
|
|
for neighbor in self.successors(node):
|
|
if color.get(neighbor) == "gray":
|
|
cycle = [neighbor]
|
|
current: str | None = node
|
|
while current and current != neighbor:
|
|
cycle.append(current)
|
|
current = parent[current]
|
|
cycle.append(neighbor)
|
|
cycle.reverse()
|
|
cycles.append(cycle)
|
|
elif color.get(neighbor, "white") == "white":
|
|
parent[neighbor] = node
|
|
visit(neighbor)
|
|
color[node] = "black"
|
|
|
|
for node in sorted(self.nodes):
|
|
if color[node] == "white":
|
|
visit(node)
|
|
return cycles
|
|
|
|
def topological_sort(self) -> list[str]:
|
|
cycles = self.detect_cycles()
|
|
if cycles:
|
|
raise ValidationError("Relationship graph contains cycles", details={"cycles": cycles})
|
|
|
|
visited: set[str] = set()
|
|
result: list[str] = []
|
|
|
|
def visit(node: str) -> None:
|
|
if node in visited:
|
|
return
|
|
visited.add(node)
|
|
for successor in sorted(self.successors(node)):
|
|
visit(successor)
|
|
result.append(node)
|
|
|
|
for node in sorted(self.nodes):
|
|
visit(node)
|
|
return result
|
|
|
|
@classmethod
|
|
def from_relationships(cls, relationships: list[Relationship]) -> "RelationshipGraph":
|
|
return cls(edges=list(relationships))
|