feat(analysis): add graph analysis utilities with networkx (S1.4)
Add connected components, betweenness centrality, Louvain community detection, modularity scoring, degree distribution, and cohesion/coupling computation. Wraps DependencyGraph via networkx (optional dependency) for downstream collection-level coherence metrics. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
6
markitect/analysis/__init__.py
Normal file
6
markitect/analysis/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""
|
||||||
|
markitect.analysis — Analytical utilities for MarkiTect.
|
||||||
|
|
||||||
|
Provides graph analysis, similarity computation, and other
|
||||||
|
quantitative tools used by infospace tooling.
|
||||||
|
"""
|
||||||
184
markitect/analysis/graph.py
Normal file
184
markitect/analysis/graph.py
Normal file
@@ -0,0 +1,184 @@
|
|||||||
|
"""
|
||||||
|
Graph analysis utilities for collection-level metrics.
|
||||||
|
|
||||||
|
Provides connected components, centrality, community detection,
|
||||||
|
modularity, degree distribution, and cohesion/coupling computation.
|
||||||
|
|
||||||
|
Requires ``networkx`` (optional dependency)::
|
||||||
|
|
||||||
|
pip install networkx
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from markitect.prompts.dependencies.models import DependencyGraph
|
||||||
|
|
||||||
|
|
||||||
|
def _require_networkx():
|
||||||
|
"""Import and return networkx, raising a clear error if missing."""
|
||||||
|
try:
|
||||||
|
import networkx as nx
|
||||||
|
return nx
|
||||||
|
except ImportError:
|
||||||
|
raise ImportError(
|
||||||
|
"networkx is required for graph analysis. "
|
||||||
|
"Install it with: pip install networkx"
|
||||||
|
) from None
|
||||||
|
|
||||||
|
|
||||||
|
def to_networkx(graph: DependencyGraph):
|
||||||
|
"""Convert a :class:`DependencyGraph` to a networkx ``DiGraph``.
|
||||||
|
|
||||||
|
Each edge carries an ``edge_type`` attribute (string value of the
|
||||||
|
:class:`EdgeType` enum, or ``None``).
|
||||||
|
"""
|
||||||
|
nx = _require_networkx()
|
||||||
|
G = nx.DiGraph()
|
||||||
|
G.add_nodes_from(graph.nodes)
|
||||||
|
for node in graph.nodes:
|
||||||
|
for succ in graph.get_successors(node):
|
||||||
|
edge_type = graph.get_edge_type(node, succ)
|
||||||
|
G.add_edge(
|
||||||
|
node, succ,
|
||||||
|
edge_type=edge_type.value if edge_type else None,
|
||||||
|
)
|
||||||
|
return G
|
||||||
|
|
||||||
|
|
||||||
|
def connected_components(graph: DependencyGraph) -> list[set[str]]:
|
||||||
|
"""Find weakly connected components (edges treated as undirected).
|
||||||
|
|
||||||
|
Returns a list of node sets, one per component, sorted largest-first.
|
||||||
|
"""
|
||||||
|
nx = _require_networkx()
|
||||||
|
G = to_networkx(graph)
|
||||||
|
components = list(nx.weakly_connected_components(G))
|
||||||
|
components.sort(key=len, reverse=True)
|
||||||
|
return [set(c) for c in components]
|
||||||
|
|
||||||
|
|
||||||
|
def betweenness_centrality(graph: DependencyGraph) -> dict[str, float]:
|
||||||
|
"""Compute betweenness centrality for all nodes.
|
||||||
|
|
||||||
|
Returns a dict mapping node ID to centrality score in [0, 1].
|
||||||
|
"""
|
||||||
|
nx = _require_networkx()
|
||||||
|
G = to_networkx(graph)
|
||||||
|
return nx.betweenness_centrality(G)
|
||||||
|
|
||||||
|
|
||||||
|
def detect_communities(
|
||||||
|
graph: DependencyGraph,
|
||||||
|
seed: Optional[int] = None,
|
||||||
|
) -> list[set[str]]:
|
||||||
|
"""Detect communities using the Louvain algorithm.
|
||||||
|
|
||||||
|
Operates on an undirected projection of the graph. Returns a list
|
||||||
|
of node sets, one per community, sorted largest-first.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
graph: The dependency graph to analyse.
|
||||||
|
seed: Random seed for reproducibility (passed to Louvain).
|
||||||
|
"""
|
||||||
|
nx = _require_networkx()
|
||||||
|
G = to_networkx(graph).to_undirected()
|
||||||
|
if len(G.nodes) == 0:
|
||||||
|
return []
|
||||||
|
communities = list(nx.community.louvain_communities(G, seed=seed))
|
||||||
|
communities.sort(key=len, reverse=True)
|
||||||
|
return [set(c) for c in communities]
|
||||||
|
|
||||||
|
|
||||||
|
def modularity_score(
|
||||||
|
graph: DependencyGraph,
|
||||||
|
communities: Optional[list[set[str]]] = None,
|
||||||
|
seed: Optional[int] = None,
|
||||||
|
) -> float:
|
||||||
|
"""Compute the modularity score for a community partition.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
graph: The dependency graph.
|
||||||
|
communities: Pre-computed communities. If ``None``, communities
|
||||||
|
are detected via :func:`detect_communities`.
|
||||||
|
seed: Random seed (used only when *communities* is ``None``).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Modularity in [-0.5, 1.0]. Returns 0.0 for graphs with no edges.
|
||||||
|
"""
|
||||||
|
nx = _require_networkx()
|
||||||
|
G = to_networkx(graph).to_undirected()
|
||||||
|
if len(G.edges) == 0:
|
||||||
|
return 0.0
|
||||||
|
if communities is None:
|
||||||
|
communities = detect_communities(graph, seed=seed)
|
||||||
|
return nx.community.modularity(G, communities)
|
||||||
|
|
||||||
|
|
||||||
|
def degree_distribution(graph: DependencyGraph) -> dict[str, dict[str, int]]:
|
||||||
|
"""Compute in-degree, out-degree, and total degree for each node.
|
||||||
|
|
||||||
|
Returns::
|
||||||
|
|
||||||
|
{"node_id": {"in_degree": 2, "out_degree": 1, "total_degree": 3}, ...}
|
||||||
|
"""
|
||||||
|
nx = _require_networkx()
|
||||||
|
G = to_networkx(graph)
|
||||||
|
result = {}
|
||||||
|
for node in G.nodes:
|
||||||
|
ind = G.in_degree(node)
|
||||||
|
outd = G.out_degree(node)
|
||||||
|
result[node] = {
|
||||||
|
"in_degree": ind,
|
||||||
|
"out_degree": outd,
|
||||||
|
"total_degree": ind + outd,
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def cohesion_coupling(
|
||||||
|
graph: DependencyGraph,
|
||||||
|
communities: Optional[list[set[str]]] = None,
|
||||||
|
seed: Optional[int] = None,
|
||||||
|
) -> dict:
|
||||||
|
"""Compute cohesion (intra-community edges) and coupling (inter-community edges).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
graph: The dependency graph.
|
||||||
|
communities: Pre-computed communities. If ``None``, detected
|
||||||
|
via :func:`detect_communities`.
|
||||||
|
seed: Random seed (used only when *communities* is ``None``).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with keys ``cohesion``, ``coupling`` (ratios in [0, 1]),
|
||||||
|
``intra_edges``, ``inter_edges``, ``total_edges``, ``communities``.
|
||||||
|
"""
|
||||||
|
_require_networkx()
|
||||||
|
G = to_networkx(graph)
|
||||||
|
if communities is None:
|
||||||
|
communities = detect_communities(graph, seed=seed)
|
||||||
|
|
||||||
|
# Build node → community index
|
||||||
|
node_community: dict[str, int] = {}
|
||||||
|
for i, comm in enumerate(communities):
|
||||||
|
for node in comm:
|
||||||
|
node_community[node] = i
|
||||||
|
|
||||||
|
intra = 0
|
||||||
|
inter = 0
|
||||||
|
for u, v in G.edges:
|
||||||
|
if node_community.get(u) == node_community.get(v):
|
||||||
|
intra += 1
|
||||||
|
else:
|
||||||
|
inter += 1
|
||||||
|
|
||||||
|
total = intra + inter
|
||||||
|
return {
|
||||||
|
"cohesion": intra / total if total > 0 else 0.0,
|
||||||
|
"coupling": inter / total if total > 0 else 0.0,
|
||||||
|
"intra_edges": intra,
|
||||||
|
"inter_edges": inter,
|
||||||
|
"total_edges": total,
|
||||||
|
"communities": len(communities),
|
||||||
|
}
|
||||||
@@ -33,6 +33,7 @@ development = [
|
|||||||
"kaizen-agentic @ file:./capabilities/kaizen-agentic"
|
"kaizen-agentic @ file:./capabilities/kaizen-agentic"
|
||||||
]
|
]
|
||||||
proxy-pdf = ["pymupdf4llm>=0.0.10"]
|
proxy-pdf = ["pymupdf4llm>=0.0.10"]
|
||||||
|
analysis = ["networkx>=3.0"]
|
||||||
proxy-html = ["markdownify>=0.13.1"]
|
proxy-html = ["markdownify>=0.13.1"]
|
||||||
proxy-markitdown = ["markitdown-no-magika[pdf]"]
|
proxy-markitdown = ["markitdown-no-magika[pdf]"]
|
||||||
proxy = ["markitdown-no-magika[pdf]"]
|
proxy = ["markitdown-no-magika[pdf]"]
|
||||||
|
|||||||
0
tests/unit/analysis/__init__.py
Normal file
0
tests/unit/analysis/__init__.py
Normal file
254
tests/unit/analysis/test_graph.py
Normal file
254
tests/unit/analysis/test_graph.py
Normal file
@@ -0,0 +1,254 @@
|
|||||||
|
"""Tests for markitect.analysis.graph."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
nx = pytest.importorskip("networkx", reason="networkx not installed")
|
||||||
|
|
||||||
|
from markitect.prompts.dependencies.models import DependencyGraph, EdgeType
|
||||||
|
from markitect.analysis.graph import (
|
||||||
|
to_networkx,
|
||||||
|
connected_components,
|
||||||
|
betweenness_centrality,
|
||||||
|
detect_communities,
|
||||||
|
modularity_score,
|
||||||
|
degree_distribution,
|
||||||
|
cohesion_coupling,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _linear_graph():
|
||||||
|
"""A -> B -> C -> D (simple chain)."""
|
||||||
|
g = DependencyGraph()
|
||||||
|
g.add_edge("A", "B")
|
||||||
|
g.add_edge("B", "C")
|
||||||
|
g.add_edge("C", "D")
|
||||||
|
return g
|
||||||
|
|
||||||
|
|
||||||
|
def _two_clusters():
|
||||||
|
"""Two dense clusters connected by a single bridge edge.
|
||||||
|
|
||||||
|
Cluster 1: A -- B -- C (fully connected)
|
||||||
|
Cluster 2: X -- Y -- Z (fully connected)
|
||||||
|
Bridge: C -> X
|
||||||
|
"""
|
||||||
|
g = DependencyGraph()
|
||||||
|
# Cluster 1
|
||||||
|
g.add_edge("A", "B")
|
||||||
|
g.add_edge("B", "A")
|
||||||
|
g.add_edge("B", "C")
|
||||||
|
g.add_edge("C", "B")
|
||||||
|
g.add_edge("A", "C")
|
||||||
|
g.add_edge("C", "A")
|
||||||
|
# Cluster 2
|
||||||
|
g.add_edge("X", "Y")
|
||||||
|
g.add_edge("Y", "X")
|
||||||
|
g.add_edge("Y", "Z")
|
||||||
|
g.add_edge("Z", "Y")
|
||||||
|
g.add_edge("X", "Z")
|
||||||
|
g.add_edge("Z", "X")
|
||||||
|
# Bridge
|
||||||
|
g.add_edge("C", "X")
|
||||||
|
return g
|
||||||
|
|
||||||
|
|
||||||
|
def _disconnected_graph():
|
||||||
|
"""Two separate components: {A, B} and {X, Y}."""
|
||||||
|
g = DependencyGraph()
|
||||||
|
g.add_edge("A", "B")
|
||||||
|
g.add_edge("X", "Y")
|
||||||
|
return g
|
||||||
|
|
||||||
|
|
||||||
|
def _empty_graph():
|
||||||
|
"""Graph with no nodes or edges."""
|
||||||
|
return DependencyGraph()
|
||||||
|
|
||||||
|
|
||||||
|
def _isolated_nodes():
|
||||||
|
"""Graph with nodes but no edges."""
|
||||||
|
g = DependencyGraph()
|
||||||
|
# add_edge creates both nodes, so we use two separate edges
|
||||||
|
# and then extract a subgraph with isolated nodes
|
||||||
|
g.add_edge("A", "B")
|
||||||
|
return g.get_subgraph({"A", "B", "C"})
|
||||||
|
|
||||||
|
|
||||||
|
# ── to_networkx ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestToNetworkx:
|
||||||
|
def test_preserves_nodes(self):
|
||||||
|
g = _linear_graph()
|
||||||
|
G = to_networkx(g)
|
||||||
|
assert set(G.nodes) == {"A", "B", "C", "D"}
|
||||||
|
|
||||||
|
def test_preserves_edges(self):
|
||||||
|
g = _linear_graph()
|
||||||
|
G = to_networkx(g)
|
||||||
|
assert G.has_edge("A", "B")
|
||||||
|
assert G.has_edge("B", "C")
|
||||||
|
assert not G.has_edge("D", "A")
|
||||||
|
|
||||||
|
def test_preserves_edge_type(self):
|
||||||
|
g = DependencyGraph()
|
||||||
|
g.add_edge("A", "B", EdgeType.GENERATES)
|
||||||
|
G = to_networkx(g)
|
||||||
|
assert G.edges["A", "B"]["edge_type"] == "generates"
|
||||||
|
|
||||||
|
def test_empty_graph(self):
|
||||||
|
G = to_networkx(_empty_graph())
|
||||||
|
assert len(G.nodes) == 0
|
||||||
|
assert len(G.edges) == 0
|
||||||
|
|
||||||
|
|
||||||
|
# ── Connected components ────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestConnectedComponents:
|
||||||
|
def test_single_component(self):
|
||||||
|
comps = connected_components(_linear_graph())
|
||||||
|
assert len(comps) == 1
|
||||||
|
assert comps[0] == {"A", "B", "C", "D"}
|
||||||
|
|
||||||
|
def test_two_components(self):
|
||||||
|
comps = connected_components(_disconnected_graph())
|
||||||
|
assert len(comps) == 2
|
||||||
|
node_sets = [frozenset(c) for c in comps]
|
||||||
|
assert frozenset({"A", "B"}) in node_sets
|
||||||
|
assert frozenset({"X", "Y"}) in node_sets
|
||||||
|
|
||||||
|
def test_sorted_largest_first(self):
|
||||||
|
g = DependencyGraph()
|
||||||
|
g.add_edge("A", "B")
|
||||||
|
g.add_edge("B", "C")
|
||||||
|
g.add_edge("X", "Y")
|
||||||
|
comps = connected_components(g)
|
||||||
|
assert len(comps[0]) >= len(comps[1])
|
||||||
|
|
||||||
|
def test_empty_graph(self):
|
||||||
|
assert connected_components(_empty_graph()) == []
|
||||||
|
|
||||||
|
|
||||||
|
# ── Betweenness centrality ──────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestBetweennessCentrality:
|
||||||
|
def test_linear_chain_middle_node_highest(self):
|
||||||
|
g = _linear_graph()
|
||||||
|
bc = betweenness_centrality(g)
|
||||||
|
# B and C are on all shortest paths between endpoints
|
||||||
|
assert bc["B"] > bc["A"]
|
||||||
|
assert bc["C"] > bc["D"]
|
||||||
|
|
||||||
|
def test_values_in_range(self):
|
||||||
|
bc = betweenness_centrality(_two_clusters())
|
||||||
|
for v in bc.values():
|
||||||
|
assert 0.0 <= v <= 1.0
|
||||||
|
|
||||||
|
def test_empty_graph(self):
|
||||||
|
assert betweenness_centrality(_empty_graph()) == {}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Community detection ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestDetectCommunities:
|
||||||
|
def test_two_clusters_detected(self):
|
||||||
|
comms = detect_communities(_two_clusters(), seed=42)
|
||||||
|
# Should detect at least 2 communities
|
||||||
|
assert len(comms) >= 2
|
||||||
|
# Each node in exactly one community
|
||||||
|
all_nodes = set()
|
||||||
|
for c in comms:
|
||||||
|
all_nodes.update(c)
|
||||||
|
assert all_nodes == {"A", "B", "C", "X", "Y", "Z"}
|
||||||
|
|
||||||
|
def test_deterministic_with_seed(self):
|
||||||
|
g = _two_clusters()
|
||||||
|
c1 = detect_communities(g, seed=42)
|
||||||
|
c2 = detect_communities(g, seed=42)
|
||||||
|
assert c1 == c2
|
||||||
|
|
||||||
|
def test_empty_graph(self):
|
||||||
|
assert detect_communities(_empty_graph()) == []
|
||||||
|
|
||||||
|
def test_sorted_largest_first(self):
|
||||||
|
comms = detect_communities(_two_clusters(), seed=42)
|
||||||
|
sizes = [len(c) for c in comms]
|
||||||
|
assert sizes == sorted(sizes, reverse=True)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Modularity score ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestModularityScore:
|
||||||
|
def test_no_edges_returns_zero(self):
|
||||||
|
assert modularity_score(_empty_graph()) == 0.0
|
||||||
|
|
||||||
|
def test_two_clusters_positive(self):
|
||||||
|
g = _two_clusters()
|
||||||
|
comms = [{"A", "B", "C"}, {"X", "Y", "Z"}]
|
||||||
|
score = modularity_score(g, communities=comms)
|
||||||
|
assert score > 0.0
|
||||||
|
|
||||||
|
def test_single_community_near_zero(self):
|
||||||
|
g = _two_clusters()
|
||||||
|
all_nodes = {"A", "B", "C", "X", "Y", "Z"}
|
||||||
|
score = modularity_score(g, communities=[all_nodes])
|
||||||
|
assert score == pytest.approx(0.0, abs=1e-10)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Degree distribution ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestDegreeDistribution:
|
||||||
|
def test_linear_chain(self):
|
||||||
|
dd = degree_distribution(_linear_graph())
|
||||||
|
# A: out=1 in=0; B: out=1 in=1; D: out=0 in=1
|
||||||
|
assert dd["A"]["out_degree"] == 1
|
||||||
|
assert dd["A"]["in_degree"] == 0
|
||||||
|
assert dd["B"]["in_degree"] == 1
|
||||||
|
assert dd["B"]["out_degree"] == 1
|
||||||
|
assert dd["D"]["in_degree"] == 1
|
||||||
|
assert dd["D"]["out_degree"] == 0
|
||||||
|
|
||||||
|
def test_total_degree(self):
|
||||||
|
dd = degree_distribution(_linear_graph())
|
||||||
|
for node, degrees in dd.items():
|
||||||
|
assert degrees["total_degree"] == degrees["in_degree"] + degrees["out_degree"]
|
||||||
|
|
||||||
|
def test_empty_graph(self):
|
||||||
|
assert degree_distribution(_empty_graph()) == {}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Cohesion / coupling ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestCohesionCoupling:
|
||||||
|
def test_two_clusters_with_bridge(self):
|
||||||
|
g = _two_clusters()
|
||||||
|
comms = [{"A", "B", "C"}, {"X", "Y", "Z"}]
|
||||||
|
cc = cohesion_coupling(g, communities=comms)
|
||||||
|
# 12 intra-cluster edges + 1 bridge = 13 total
|
||||||
|
assert cc["intra_edges"] == 12
|
||||||
|
assert cc["inter_edges"] == 1
|
||||||
|
assert cc["total_edges"] == 13
|
||||||
|
assert cc["cohesion"] == pytest.approx(12 / 13)
|
||||||
|
assert cc["coupling"] == pytest.approx(1 / 13)
|
||||||
|
assert cc["communities"] == 2
|
||||||
|
|
||||||
|
def test_no_edges(self):
|
||||||
|
cc = cohesion_coupling(_empty_graph())
|
||||||
|
assert cc["cohesion"] == 0.0
|
||||||
|
assert cc["coupling"] == 0.0
|
||||||
|
assert cc["total_edges"] == 0
|
||||||
|
|
||||||
|
def test_ratios_sum_to_one(self):
|
||||||
|
g = _two_clusters()
|
||||||
|
comms = [{"A", "B", "C"}, {"X", "Y", "Z"}]
|
||||||
|
cc = cohesion_coupling(g, communities=comms)
|
||||||
|
assert cc["cohesion"] + cc["coupling"] == pytest.approx(1.0)
|
||||||
Reference in New Issue
Block a user