Files
markitect-main/tests/unit/analysis/test_graph.py
tegwick bad01e32bd feat(analysis): add graph analysis utilities with networkx (S1.4)
Add connected components, betweenness centrality, Louvain community
detection, modularity scoring, degree distribution, and cohesion/coupling
computation. Wraps DependencyGraph via networkx (optional dependency)
for downstream collection-level coherence metrics.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 01:34:53 +01:00

255 lines
8.1 KiB
Python

"""Tests for markitect.analysis.graph."""
import pytest
nx = pytest.importorskip("networkx", reason="networkx not installed")
from markitect.prompts.dependencies.models import DependencyGraph, EdgeType
from markitect.analysis.graph import (
to_networkx,
connected_components,
betweenness_centrality,
detect_communities,
modularity_score,
degree_distribution,
cohesion_coupling,
)
# ── Helpers ──────────────────────────────────────────────────────────
def _linear_graph():
"""A -> B -> C -> D (simple chain)."""
g = DependencyGraph()
g.add_edge("A", "B")
g.add_edge("B", "C")
g.add_edge("C", "D")
return g
def _two_clusters():
"""Two dense clusters connected by a single bridge edge.
Cluster 1: A -- B -- C (fully connected)
Cluster 2: X -- Y -- Z (fully connected)
Bridge: C -> X
"""
g = DependencyGraph()
# Cluster 1
g.add_edge("A", "B")
g.add_edge("B", "A")
g.add_edge("B", "C")
g.add_edge("C", "B")
g.add_edge("A", "C")
g.add_edge("C", "A")
# Cluster 2
g.add_edge("X", "Y")
g.add_edge("Y", "X")
g.add_edge("Y", "Z")
g.add_edge("Z", "Y")
g.add_edge("X", "Z")
g.add_edge("Z", "X")
# Bridge
g.add_edge("C", "X")
return g
def _disconnected_graph():
"""Two separate components: {A, B} and {X, Y}."""
g = DependencyGraph()
g.add_edge("A", "B")
g.add_edge("X", "Y")
return g
def _empty_graph():
"""Graph with no nodes or edges."""
return DependencyGraph()
def _isolated_nodes():
"""Graph with nodes but no edges."""
g = DependencyGraph()
# add_edge creates both nodes, so we use two separate edges
# and then extract a subgraph with isolated nodes
g.add_edge("A", "B")
return g.get_subgraph({"A", "B", "C"})
# ── to_networkx ─────────────────────────────────────────────────────
class TestToNetworkx:
def test_preserves_nodes(self):
g = _linear_graph()
G = to_networkx(g)
assert set(G.nodes) == {"A", "B", "C", "D"}
def test_preserves_edges(self):
g = _linear_graph()
G = to_networkx(g)
assert G.has_edge("A", "B")
assert G.has_edge("B", "C")
assert not G.has_edge("D", "A")
def test_preserves_edge_type(self):
g = DependencyGraph()
g.add_edge("A", "B", EdgeType.GENERATES)
G = to_networkx(g)
assert G.edges["A", "B"]["edge_type"] == "generates"
def test_empty_graph(self):
G = to_networkx(_empty_graph())
assert len(G.nodes) == 0
assert len(G.edges) == 0
# ── Connected components ────────────────────────────────────────────
class TestConnectedComponents:
def test_single_component(self):
comps = connected_components(_linear_graph())
assert len(comps) == 1
assert comps[0] == {"A", "B", "C", "D"}
def test_two_components(self):
comps = connected_components(_disconnected_graph())
assert len(comps) == 2
node_sets = [frozenset(c) for c in comps]
assert frozenset({"A", "B"}) in node_sets
assert frozenset({"X", "Y"}) in node_sets
def test_sorted_largest_first(self):
g = DependencyGraph()
g.add_edge("A", "B")
g.add_edge("B", "C")
g.add_edge("X", "Y")
comps = connected_components(g)
assert len(comps[0]) >= len(comps[1])
def test_empty_graph(self):
assert connected_components(_empty_graph()) == []
# ── Betweenness centrality ──────────────────────────────────────────
class TestBetweennessCentrality:
def test_linear_chain_middle_node_highest(self):
g = _linear_graph()
bc = betweenness_centrality(g)
# B and C are on all shortest paths between endpoints
assert bc["B"] > bc["A"]
assert bc["C"] > bc["D"]
def test_values_in_range(self):
bc = betweenness_centrality(_two_clusters())
for v in bc.values():
assert 0.0 <= v <= 1.0
def test_empty_graph(self):
assert betweenness_centrality(_empty_graph()) == {}
# ── Community detection ─────────────────────────────────────────────
class TestDetectCommunities:
def test_two_clusters_detected(self):
comms = detect_communities(_two_clusters(), seed=42)
# Should detect at least 2 communities
assert len(comms) >= 2
# Each node in exactly one community
all_nodes = set()
for c in comms:
all_nodes.update(c)
assert all_nodes == {"A", "B", "C", "X", "Y", "Z"}
def test_deterministic_with_seed(self):
g = _two_clusters()
c1 = detect_communities(g, seed=42)
c2 = detect_communities(g, seed=42)
assert c1 == c2
def test_empty_graph(self):
assert detect_communities(_empty_graph()) == []
def test_sorted_largest_first(self):
comms = detect_communities(_two_clusters(), seed=42)
sizes = [len(c) for c in comms]
assert sizes == sorted(sizes, reverse=True)
# ── Modularity score ────────────────────────────────────────────────
class TestModularityScore:
def test_no_edges_returns_zero(self):
assert modularity_score(_empty_graph()) == 0.0
def test_two_clusters_positive(self):
g = _two_clusters()
comms = [{"A", "B", "C"}, {"X", "Y", "Z"}]
score = modularity_score(g, communities=comms)
assert score > 0.0
def test_single_community_near_zero(self):
g = _two_clusters()
all_nodes = {"A", "B", "C", "X", "Y", "Z"}
score = modularity_score(g, communities=[all_nodes])
assert score == pytest.approx(0.0, abs=1e-10)
# ── Degree distribution ─────────────────────────────────────────────
class TestDegreeDistribution:
def test_linear_chain(self):
dd = degree_distribution(_linear_graph())
# A: out=1 in=0; B: out=1 in=1; D: out=0 in=1
assert dd["A"]["out_degree"] == 1
assert dd["A"]["in_degree"] == 0
assert dd["B"]["in_degree"] == 1
assert dd["B"]["out_degree"] == 1
assert dd["D"]["in_degree"] == 1
assert dd["D"]["out_degree"] == 0
def test_total_degree(self):
dd = degree_distribution(_linear_graph())
for node, degrees in dd.items():
assert degrees["total_degree"] == degrees["in_degree"] + degrees["out_degree"]
def test_empty_graph(self):
assert degree_distribution(_empty_graph()) == {}
# ── Cohesion / coupling ─────────────────────────────────────────────
class TestCohesionCoupling:
def test_two_clusters_with_bridge(self):
g = _two_clusters()
comms = [{"A", "B", "C"}, {"X", "Y", "Z"}]
cc = cohesion_coupling(g, communities=comms)
# 12 intra-cluster edges + 1 bridge = 13 total
assert cc["intra_edges"] == 12
assert cc["inter_edges"] == 1
assert cc["total_edges"] == 13
assert cc["cohesion"] == pytest.approx(12 / 13)
assert cc["coupling"] == pytest.approx(1 / 13)
assert cc["communities"] == 2
def test_no_edges(self):
cc = cohesion_coupling(_empty_graph())
assert cc["cohesion"] == 0.0
assert cc["coupling"] == 0.0
assert cc["total_edges"] == 0
def test_ratios_sum_to_one(self):
g = _two_clusters()
comms = [{"A", "B", "C"}, {"X", "Y", "Z"}]
cc = cohesion_coupling(g, communities=comms)
assert cc["cohesion"] + cc["coupling"] == pytest.approx(1.0)