feat(analysis): add Formal Concept Analysis for coverage gap detection (S1.7)
Pure-Python FCA implementation: FormalContext (entity × attribute binary relation with extent/intent/closure), ConceptLattice via NextClosure algorithm, find_gap_concepts() for structural coverage gaps, and find_empty_cells() for cross-tabulation analysis. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
307
markitect/analysis/fca.py
Normal file
307
markitect/analysis/fca.py
Normal file
@@ -0,0 +1,307 @@
|
||||
"""
|
||||
Formal Concept Analysis (FCA) for coverage gap detection.
|
||||
|
||||
Provides a pure-Python implementation of:
|
||||
|
||||
- :class:`FormalContext` — entity × attribute binary relation with
|
||||
extent/intent operations and double-prime closure.
|
||||
- :class:`ConceptLattice` — the set of all formal concepts computed
|
||||
via the NextClosure algorithm (Ganter, 1984).
|
||||
- :func:`find_gap_concepts` — attribute combinations present in the
|
||||
lattice whose extent is empty, revealing structural coverage gaps.
|
||||
|
||||
Sufficient for entity scales of ~100s. For larger contexts a library
|
||||
such as ``concepts`` (PyPI) can be substituted.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Iterable, Optional
|
||||
|
||||
|
||||
class FormalContext:
|
||||
"""Binary relation between objects and attributes.
|
||||
|
||||
Args:
|
||||
objects: Iterable of object identifiers (e.g. entity slugs).
|
||||
attributes: Iterable of attribute identifiers (e.g. "domain:Production").
|
||||
incidence: Mapping of object → set of attributes it possesses.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
objects: Iterable[str],
|
||||
attributes: Iterable[str],
|
||||
incidence: dict[str, set[str]],
|
||||
):
|
||||
self._objects = sorted(set(objects))
|
||||
self._attributes = sorted(set(attributes))
|
||||
self._obj_set = frozenset(self._objects)
|
||||
self._attr_set = frozenset(self._attributes)
|
||||
|
||||
# Normalise incidence: only keep known attributes
|
||||
self._incidence: dict[str, frozenset[str]] = {}
|
||||
for obj in self._objects:
|
||||
raw = incidence.get(obj, set())
|
||||
self._incidence[obj] = frozenset(raw) & self._attr_set
|
||||
|
||||
# Reverse index: attribute → set of objects that have it
|
||||
self._attr_to_objs: dict[str, frozenset[str]] = {}
|
||||
for attr in self._attributes:
|
||||
self._attr_to_objs[attr] = frozenset(
|
||||
obj for obj in self._objects if attr in self._incidence[obj]
|
||||
)
|
||||
|
||||
@property
|
||||
def objects(self) -> list[str]:
|
||||
"""Sorted list of objects."""
|
||||
return list(self._objects)
|
||||
|
||||
@property
|
||||
def attributes(self) -> list[str]:
|
||||
"""Sorted list of attributes."""
|
||||
return list(self._attributes)
|
||||
|
||||
@property
|
||||
def object_count(self) -> int:
|
||||
return len(self._objects)
|
||||
|
||||
@property
|
||||
def attribute_count(self) -> int:
|
||||
return len(self._attributes)
|
||||
|
||||
def extent(self, attrs: Iterable[str]) -> frozenset[str]:
|
||||
"""Objects possessing **all** given attributes (B' operation)."""
|
||||
attr_set = frozenset(attrs)
|
||||
if not attr_set:
|
||||
return self._obj_set
|
||||
result = self._obj_set
|
||||
for attr in attr_set:
|
||||
result = result & self._attr_to_objs.get(attr, frozenset())
|
||||
return result
|
||||
|
||||
def intent(self, objs: Iterable[str]) -> frozenset[str]:
|
||||
"""Attributes shared by **all** given objects (A' operation)."""
|
||||
obj_list = [o for o in objs if o in self._incidence]
|
||||
if not obj_list:
|
||||
return self._attr_set
|
||||
result = self._incidence[obj_list[0]]
|
||||
for obj in obj_list[1:]:
|
||||
result = result & self._incidence[obj]
|
||||
return result
|
||||
|
||||
def closure(self, attrs: Iterable[str]) -> frozenset[str]:
|
||||
"""Double-prime closure: B'' = intent(extent(B))."""
|
||||
return self.intent(self.extent(attrs))
|
||||
|
||||
def has_attribute(self, obj: str, attr: str) -> bool:
|
||||
"""Check if *obj* has *attr*."""
|
||||
return attr in self._incidence.get(obj, frozenset())
|
||||
|
||||
def density(self) -> float:
|
||||
"""Proportion of 1s in the incidence matrix."""
|
||||
total = len(self._objects) * len(self._attributes)
|
||||
if total == 0:
|
||||
return 0.0
|
||||
filled = sum(len(attrs) for attrs in self._incidence.values())
|
||||
return filled / total
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, entity_attributes: dict[str, set[str]]) -> FormalContext:
|
||||
"""Convenience: build context from ``{object: {attr, ...}}``."""
|
||||
objects = list(entity_attributes.keys())
|
||||
all_attrs: set[str] = set()
|
||||
for attrs in entity_attributes.values():
|
||||
all_attrs.update(attrs)
|
||||
return cls(objects, all_attrs, entity_attributes)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FormalConcept:
|
||||
"""A formal concept (A, B) where A' = B and B' = A."""
|
||||
|
||||
extent: frozenset[str]
|
||||
intent: frozenset[str]
|
||||
|
||||
@property
|
||||
def extent_size(self) -> int:
|
||||
return len(self.extent)
|
||||
|
||||
@property
|
||||
def intent_size(self) -> int:
|
||||
return len(self.intent)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConceptLattice:
|
||||
"""The set of all formal concepts derived from a :class:`FormalContext`.
|
||||
|
||||
Concepts are ordered by extent inclusion (subconcept ≤ superconcept).
|
||||
"""
|
||||
|
||||
concepts: list[FormalConcept] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def size(self) -> int:
|
||||
"""Number of formal concepts in the lattice."""
|
||||
return len(self.concepts)
|
||||
|
||||
@property
|
||||
def top(self) -> Optional[FormalConcept]:
|
||||
"""Supremum: concept with largest extent."""
|
||||
if not self.concepts:
|
||||
return None
|
||||
return max(self.concepts, key=lambda c: c.extent_size)
|
||||
|
||||
@property
|
||||
def bottom(self) -> Optional[FormalConcept]:
|
||||
"""Infimum: concept with largest intent."""
|
||||
if not self.concepts:
|
||||
return None
|
||||
return max(self.concepts, key=lambda c: c.intent_size)
|
||||
|
||||
@classmethod
|
||||
def from_context(cls, context: FormalContext) -> ConceptLattice:
|
||||
"""Compute all formal concepts using the NextClosure algorithm."""
|
||||
attrs = context.attributes # sorted, fixed order
|
||||
if not attrs:
|
||||
# Degenerate: no attributes → single concept with all objects
|
||||
top = FormalConcept(
|
||||
extent=frozenset(context.objects),
|
||||
intent=frozenset(),
|
||||
)
|
||||
return cls(concepts=[top])
|
||||
|
||||
concepts: list[FormalConcept] = []
|
||||
|
||||
# Start with closure of empty attribute set
|
||||
current = context.closure(frozenset())
|
||||
ext = context.extent(current)
|
||||
concepts.append(FormalConcept(extent=ext, intent=current))
|
||||
|
||||
while current != frozenset(attrs):
|
||||
nxt = _next_closure(current, attrs, context.closure)
|
||||
if nxt is None:
|
||||
break
|
||||
ext = context.extent(nxt)
|
||||
concepts.append(FormalConcept(extent=ext, intent=nxt))
|
||||
current = nxt
|
||||
|
||||
return cls(concepts=concepts)
|
||||
|
||||
def gap_concepts(self) -> list[FormalConcept]:
|
||||
"""Formal concepts whose extent is empty."""
|
||||
return [c for c in self.concepts if c.extent_size == 0]
|
||||
|
||||
def concepts_with_extent_size(self, min_size: int = 0, max_size: Optional[int] = None) -> list[FormalConcept]:
|
||||
"""Filter concepts by extent size."""
|
||||
result = [c for c in self.concepts if c.extent_size >= min_size]
|
||||
if max_size is not None:
|
||||
result = [c for c in result if c.extent_size <= max_size]
|
||||
return result
|
||||
|
||||
def depth(self) -> int:
|
||||
"""Longest chain length in the concept ordering.
|
||||
|
||||
A chain is a sequence of concepts c_1 < c_2 < ... < c_k
|
||||
where < means strict subconcept (extent inclusion).
|
||||
"""
|
||||
if not self.concepts:
|
||||
return 0
|
||||
|
||||
# Build DAG: concept i → j if i is direct subconcept of j
|
||||
# Use extent inclusion: i < j iff extent_i ⊂ extent_j
|
||||
n = len(self.concepts)
|
||||
extents = [c.extent for c in self.concepts]
|
||||
|
||||
# Longest path via dynamic programming on sorted order
|
||||
# Sort by extent size ascending (smaller extents = more specific)
|
||||
order = sorted(range(n), key=lambda i: len(extents[i]))
|
||||
longest = [1] * n
|
||||
|
||||
for idx in range(n):
|
||||
i = order[idx]
|
||||
for jdx in range(idx + 1, n):
|
||||
j = order[jdx]
|
||||
if extents[i] < extents[j]: # strict subset
|
||||
if longest[j] < longest[i] + 1:
|
||||
longest[j] = longest[i] + 1
|
||||
|
||||
return max(longest) if longest else 0
|
||||
|
||||
|
||||
def find_gap_concepts(
|
||||
context: FormalContext,
|
||||
lattice: Optional[ConceptLattice] = None,
|
||||
) -> list[FormalConcept]:
|
||||
"""Find formal concepts with empty extent (coverage gaps).
|
||||
|
||||
These represent attribute combinations that are structurally
|
||||
present in the lattice but have no corresponding entities.
|
||||
|
||||
Args:
|
||||
context: The formal context.
|
||||
lattice: Pre-computed lattice. If ``None``, computed from *context*.
|
||||
|
||||
Returns:
|
||||
List of :class:`FormalConcept` with empty extent, sorted by
|
||||
intent size ascending (most specific gaps first).
|
||||
"""
|
||||
if lattice is None:
|
||||
lattice = ConceptLattice.from_context(context)
|
||||
gaps = lattice.gap_concepts()
|
||||
gaps.sort(key=lambda c: c.intent_size)
|
||||
return gaps
|
||||
|
||||
|
||||
def find_empty_cells(
|
||||
context: FormalContext,
|
||||
dimension_a: list[str],
|
||||
dimension_b: list[str],
|
||||
) -> list[tuple[str, str]]:
|
||||
"""Find empty cells in a two-dimensional cross-tabulation.
|
||||
|
||||
Given two sets of attributes (e.g. domain values and VSM systems),
|
||||
return pairs ``(attr_a, attr_b)`` where no object possesses both.
|
||||
|
||||
This is a simpler alternative to full FCA for two-dimensional
|
||||
coverage analysis.
|
||||
"""
|
||||
empty: list[tuple[str, str]] = []
|
||||
for a in sorted(dimension_a):
|
||||
for b in sorted(dimension_b):
|
||||
if not context.extent([a, b]):
|
||||
empty.append((a, b))
|
||||
return empty
|
||||
|
||||
|
||||
# ── NextClosure internals ───────────────────────────────────────────
|
||||
|
||||
|
||||
def _next_closure(
|
||||
current: frozenset[str],
|
||||
attrs: list[str],
|
||||
closure_fn,
|
||||
) -> Optional[frozenset[str]]:
|
||||
"""Compute the next closed set in lectic order after *current*.
|
||||
|
||||
Implements Ganter's NextClosure algorithm.
|
||||
"""
|
||||
for i in range(len(attrs) - 1, -1, -1):
|
||||
m = attrs[i]
|
||||
if m in current:
|
||||
current = current - {m}
|
||||
else:
|
||||
candidate = current | {m}
|
||||
closed = closure_fn(candidate)
|
||||
# Canonicity test: no attribute before position i
|
||||
# was added by the closure
|
||||
canonical = True
|
||||
for j in range(i):
|
||||
if attrs[j] in closed and attrs[j] not in candidate:
|
||||
canonical = False
|
||||
break
|
||||
if canonical:
|
||||
return closed
|
||||
return None
|
||||
Reference in New Issue
Block a user