From bad01e32bd220e5d085f5c4b6e724a6e3cf2d94c Mon Sep 17 00:00:00 2001 From: tegwick Date: Thu, 19 Feb 2026 01:34:53 +0100 Subject: [PATCH] feat(analysis): add graph analysis utilities with networkx (S1.4) Add connected components, betweenness centrality, Louvain community detection, modularity scoring, degree distribution, and cohesion/coupling computation. Wraps DependencyGraph via networkx (optional dependency) for downstream collection-level coherence metrics. Co-Authored-By: Claude Opus 4.6 --- markitect/analysis/__init__.py | 6 + markitect/analysis/graph.py | 184 ++++++++++++++++++++++ pyproject.toml | 1 + tests/unit/analysis/__init__.py | 0 tests/unit/analysis/test_graph.py | 254 ++++++++++++++++++++++++++++++ 5 files changed, 445 insertions(+) create mode 100644 markitect/analysis/__init__.py create mode 100644 markitect/analysis/graph.py create mode 100644 tests/unit/analysis/__init__.py create mode 100644 tests/unit/analysis/test_graph.py diff --git a/markitect/analysis/__init__.py b/markitect/analysis/__init__.py new file mode 100644 index 00000000..94f3b51b --- /dev/null +++ b/markitect/analysis/__init__.py @@ -0,0 +1,6 @@ +""" +markitect.analysis — Analytical utilities for MarkiTect. + +Provides graph analysis, similarity computation, and other +quantitative tools used by infospace tooling. +""" diff --git a/markitect/analysis/graph.py b/markitect/analysis/graph.py new file mode 100644 index 00000000..14a5846d --- /dev/null +++ b/markitect/analysis/graph.py @@ -0,0 +1,184 @@ +""" +Graph analysis utilities for collection-level metrics. + +Provides connected components, centrality, community detection, +modularity, degree distribution, and cohesion/coupling computation. + +Requires ``networkx`` (optional dependency):: + + pip install networkx +""" + +from __future__ import annotations + +from typing import Optional + +from markitect.prompts.dependencies.models import DependencyGraph + + +def _require_networkx(): + """Import and return networkx, raising a clear error if missing.""" + try: + import networkx as nx + return nx + except ImportError: + raise ImportError( + "networkx is required for graph analysis. " + "Install it with: pip install networkx" + ) from None + + +def to_networkx(graph: DependencyGraph): + """Convert a :class:`DependencyGraph` to a networkx ``DiGraph``. + + Each edge carries an ``edge_type`` attribute (string value of the + :class:`EdgeType` enum, or ``None``). + """ + nx = _require_networkx() + G = nx.DiGraph() + G.add_nodes_from(graph.nodes) + for node in graph.nodes: + for succ in graph.get_successors(node): + edge_type = graph.get_edge_type(node, succ) + G.add_edge( + node, succ, + edge_type=edge_type.value if edge_type else None, + ) + return G + + +def connected_components(graph: DependencyGraph) -> list[set[str]]: + """Find weakly connected components (edges treated as undirected). + + Returns a list of node sets, one per component, sorted largest-first. + """ + nx = _require_networkx() + G = to_networkx(graph) + components = list(nx.weakly_connected_components(G)) + components.sort(key=len, reverse=True) + return [set(c) for c in components] + + +def betweenness_centrality(graph: DependencyGraph) -> dict[str, float]: + """Compute betweenness centrality for all nodes. + + Returns a dict mapping node ID to centrality score in [0, 1]. + """ + nx = _require_networkx() + G = to_networkx(graph) + return nx.betweenness_centrality(G) + + +def detect_communities( + graph: DependencyGraph, + seed: Optional[int] = None, +) -> list[set[str]]: + """Detect communities using the Louvain algorithm. + + Operates on an undirected projection of the graph. Returns a list + of node sets, one per community, sorted largest-first. + + Args: + graph: The dependency graph to analyse. + seed: Random seed for reproducibility (passed to Louvain). + """ + nx = _require_networkx() + G = to_networkx(graph).to_undirected() + if len(G.nodes) == 0: + return [] + communities = list(nx.community.louvain_communities(G, seed=seed)) + communities.sort(key=len, reverse=True) + return [set(c) for c in communities] + + +def modularity_score( + graph: DependencyGraph, + communities: Optional[list[set[str]]] = None, + seed: Optional[int] = None, +) -> float: + """Compute the modularity score for a community partition. + + Args: + graph: The dependency graph. + communities: Pre-computed communities. If ``None``, communities + are detected via :func:`detect_communities`. + seed: Random seed (used only when *communities* is ``None``). + + Returns: + Modularity in [-0.5, 1.0]. Returns 0.0 for graphs with no edges. + """ + nx = _require_networkx() + G = to_networkx(graph).to_undirected() + if len(G.edges) == 0: + return 0.0 + if communities is None: + communities = detect_communities(graph, seed=seed) + return nx.community.modularity(G, communities) + + +def degree_distribution(graph: DependencyGraph) -> dict[str, dict[str, int]]: + """Compute in-degree, out-degree, and total degree for each node. + + Returns:: + + {"node_id": {"in_degree": 2, "out_degree": 1, "total_degree": 3}, ...} + """ + nx = _require_networkx() + G = to_networkx(graph) + result = {} + for node in G.nodes: + ind = G.in_degree(node) + outd = G.out_degree(node) + result[node] = { + "in_degree": ind, + "out_degree": outd, + "total_degree": ind + outd, + } + return result + + +def cohesion_coupling( + graph: DependencyGraph, + communities: Optional[list[set[str]]] = None, + seed: Optional[int] = None, +) -> dict: + """Compute cohesion (intra-community edges) and coupling (inter-community edges). + + Args: + graph: The dependency graph. + communities: Pre-computed communities. If ``None``, detected + via :func:`detect_communities`. + seed: Random seed (used only when *communities* is ``None``). + + Returns: + Dict with keys ``cohesion``, ``coupling`` (ratios in [0, 1]), + ``intra_edges``, ``inter_edges``, ``total_edges``, ``communities``. + """ + _require_networkx() + G = to_networkx(graph) + if communities is None: + communities = detect_communities(graph, seed=seed) + + # Build node → community index + node_community: dict[str, int] = {} + for i, comm in enumerate(communities): + for node in comm: + node_community[node] = i + + intra = 0 + inter = 0 + for u, v in G.edges: + if node_community.get(u) == node_community.get(v): + intra += 1 + else: + inter += 1 + + total = intra + inter + return { + "cohesion": intra / total if total > 0 else 0.0, + "coupling": inter / total if total > 0 else 0.0, + "intra_edges": intra, + "inter_edges": inter, + "total_edges": total, + "communities": len(communities), + } diff --git a/pyproject.toml b/pyproject.toml index b13e989c..9af3573d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ development = [ "kaizen-agentic @ file:./capabilities/kaizen-agentic" ] proxy-pdf = ["pymupdf4llm>=0.0.10"] +analysis = ["networkx>=3.0"] proxy-html = ["markdownify>=0.13.1"] proxy-markitdown = ["markitdown-no-magika[pdf]"] proxy = ["markitdown-no-magika[pdf]"] diff --git a/tests/unit/analysis/__init__.py b/tests/unit/analysis/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/analysis/test_graph.py b/tests/unit/analysis/test_graph.py new file mode 100644 index 00000000..6c16508e --- /dev/null +++ b/tests/unit/analysis/test_graph.py @@ -0,0 +1,254 @@ +"""Tests for markitect.analysis.graph.""" + +import pytest + +nx = pytest.importorskip("networkx", reason="networkx not installed") + +from markitect.prompts.dependencies.models import DependencyGraph, EdgeType +from markitect.analysis.graph import ( + to_networkx, + connected_components, + betweenness_centrality, + detect_communities, + modularity_score, + degree_distribution, + cohesion_coupling, +) + + +# ── Helpers ────────────────────────────────────────────────────────── + + +def _linear_graph(): + """A -> B -> C -> D (simple chain).""" + g = DependencyGraph() + g.add_edge("A", "B") + g.add_edge("B", "C") + g.add_edge("C", "D") + return g + + +def _two_clusters(): + """Two dense clusters connected by a single bridge edge. + + Cluster 1: A -- B -- C (fully connected) + Cluster 2: X -- Y -- Z (fully connected) + Bridge: C -> X + """ + g = DependencyGraph() + # Cluster 1 + g.add_edge("A", "B") + g.add_edge("B", "A") + g.add_edge("B", "C") + g.add_edge("C", "B") + g.add_edge("A", "C") + g.add_edge("C", "A") + # Cluster 2 + g.add_edge("X", "Y") + g.add_edge("Y", "X") + g.add_edge("Y", "Z") + g.add_edge("Z", "Y") + g.add_edge("X", "Z") + g.add_edge("Z", "X") + # Bridge + g.add_edge("C", "X") + return g + + +def _disconnected_graph(): + """Two separate components: {A, B} and {X, Y}.""" + g = DependencyGraph() + g.add_edge("A", "B") + g.add_edge("X", "Y") + return g + + +def _empty_graph(): + """Graph with no nodes or edges.""" + return DependencyGraph() + + +def _isolated_nodes(): + """Graph with nodes but no edges.""" + g = DependencyGraph() + # add_edge creates both nodes, so we use two separate edges + # and then extract a subgraph with isolated nodes + g.add_edge("A", "B") + return g.get_subgraph({"A", "B", "C"}) + + +# ── to_networkx ───────────────────────────────────────────────────── + + +class TestToNetworkx: + def test_preserves_nodes(self): + g = _linear_graph() + G = to_networkx(g) + assert set(G.nodes) == {"A", "B", "C", "D"} + + def test_preserves_edges(self): + g = _linear_graph() + G = to_networkx(g) + assert G.has_edge("A", "B") + assert G.has_edge("B", "C") + assert not G.has_edge("D", "A") + + def test_preserves_edge_type(self): + g = DependencyGraph() + g.add_edge("A", "B", EdgeType.GENERATES) + G = to_networkx(g) + assert G.edges["A", "B"]["edge_type"] == "generates" + + def test_empty_graph(self): + G = to_networkx(_empty_graph()) + assert len(G.nodes) == 0 + assert len(G.edges) == 0 + + +# ── Connected components ──────────────────────────────────────────── + + +class TestConnectedComponents: + def test_single_component(self): + comps = connected_components(_linear_graph()) + assert len(comps) == 1 + assert comps[0] == {"A", "B", "C", "D"} + + def test_two_components(self): + comps = connected_components(_disconnected_graph()) + assert len(comps) == 2 + node_sets = [frozenset(c) for c in comps] + assert frozenset({"A", "B"}) in node_sets + assert frozenset({"X", "Y"}) in node_sets + + def test_sorted_largest_first(self): + g = DependencyGraph() + g.add_edge("A", "B") + g.add_edge("B", "C") + g.add_edge("X", "Y") + comps = connected_components(g) + assert len(comps[0]) >= len(comps[1]) + + def test_empty_graph(self): + assert connected_components(_empty_graph()) == [] + + +# ── Betweenness centrality ────────────────────────────────────────── + + +class TestBetweennessCentrality: + def test_linear_chain_middle_node_highest(self): + g = _linear_graph() + bc = betweenness_centrality(g) + # B and C are on all shortest paths between endpoints + assert bc["B"] > bc["A"] + assert bc["C"] > bc["D"] + + def test_values_in_range(self): + bc = betweenness_centrality(_two_clusters()) + for v in bc.values(): + assert 0.0 <= v <= 1.0 + + def test_empty_graph(self): + assert betweenness_centrality(_empty_graph()) == {} + + +# ── Community detection ───────────────────────────────────────────── + + +class TestDetectCommunities: + def test_two_clusters_detected(self): + comms = detect_communities(_two_clusters(), seed=42) + # Should detect at least 2 communities + assert len(comms) >= 2 + # Each node in exactly one community + all_nodes = set() + for c in comms: + all_nodes.update(c) + assert all_nodes == {"A", "B", "C", "X", "Y", "Z"} + + def test_deterministic_with_seed(self): + g = _two_clusters() + c1 = detect_communities(g, seed=42) + c2 = detect_communities(g, seed=42) + assert c1 == c2 + + def test_empty_graph(self): + assert detect_communities(_empty_graph()) == [] + + def test_sorted_largest_first(self): + comms = detect_communities(_two_clusters(), seed=42) + sizes = [len(c) for c in comms] + assert sizes == sorted(sizes, reverse=True) + + +# ── Modularity score ──────────────────────────────────────────────── + + +class TestModularityScore: + def test_no_edges_returns_zero(self): + assert modularity_score(_empty_graph()) == 0.0 + + def test_two_clusters_positive(self): + g = _two_clusters() + comms = [{"A", "B", "C"}, {"X", "Y", "Z"}] + score = modularity_score(g, communities=comms) + assert score > 0.0 + + def test_single_community_near_zero(self): + g = _two_clusters() + all_nodes = {"A", "B", "C", "X", "Y", "Z"} + score = modularity_score(g, communities=[all_nodes]) + assert score == pytest.approx(0.0, abs=1e-10) + + +# ── Degree distribution ───────────────────────────────────────────── + + +class TestDegreeDistribution: + def test_linear_chain(self): + dd = degree_distribution(_linear_graph()) + # A: out=1 in=0; B: out=1 in=1; D: out=0 in=1 + assert dd["A"]["out_degree"] == 1 + assert dd["A"]["in_degree"] == 0 + assert dd["B"]["in_degree"] == 1 + assert dd["B"]["out_degree"] == 1 + assert dd["D"]["in_degree"] == 1 + assert dd["D"]["out_degree"] == 0 + + def test_total_degree(self): + dd = degree_distribution(_linear_graph()) + for node, degrees in dd.items(): + assert degrees["total_degree"] == degrees["in_degree"] + degrees["out_degree"] + + def test_empty_graph(self): + assert degree_distribution(_empty_graph()) == {} + + +# ── Cohesion / coupling ───────────────────────────────────────────── + + +class TestCohesionCoupling: + def test_two_clusters_with_bridge(self): + g = _two_clusters() + comms = [{"A", "B", "C"}, {"X", "Y", "Z"}] + cc = cohesion_coupling(g, communities=comms) + # 12 intra-cluster edges + 1 bridge = 13 total + assert cc["intra_edges"] == 12 + assert cc["inter_edges"] == 1 + assert cc["total_edges"] == 13 + assert cc["cohesion"] == pytest.approx(12 / 13) + assert cc["coupling"] == pytest.approx(1 / 13) + assert cc["communities"] == 2 + + def test_no_edges(self): + cc = cohesion_coupling(_empty_graph()) + assert cc["cohesion"] == 0.0 + assert cc["coupling"] == 0.0 + assert cc["total_edges"] == 0 + + def test_ratios_sum_to_one(self): + g = _two_clusters() + comms = [{"A", "B", "C"}, {"X", "Y", "Z"}] + cc = cohesion_coupling(g, communities=comms) + assert cc["cohesion"] + cc["coupling"] == pytest.approx(1.0)