feat(infospace): add collection-level quality checks C1–C5 (S2.4)

Five concern checks: Redundancy (embedding/word overlap), Coverage
(FCA gap analysis), Coherence (graph connectivity), Consistency
(cycle detection), Granularity (Shannon entropy). Orchestrator runs
all or selected checks, CLI `markitect infospace check` command added.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-19 01:54:22 +01:00
parent 3461d2f354
commit 11585e6968
9 changed files with 1042 additions and 0 deletions

View File

@@ -0,0 +1,23 @@
"""
Collection-level quality checks for infospaces.
Five concerns: Redundancy (C1), Coverage (C2), Coherence (C3),
Consistency (C4), Granularity (C5).
"""
from markitect.infospace.checks.redundancy import check_redundancy
from markitect.infospace.checks.coverage import check_coverage
from markitect.infospace.checks.coherence import check_coherence
from markitect.infospace.checks.consistency import check_consistency
from markitect.infospace.checks.granularity import check_granularity
from markitect.infospace.checks.orchestrator import run_all_checks, CheckReport
__all__ = [
"check_redundancy",
"check_coverage",
"check_coherence",
"check_consistency",
"check_granularity",
"run_all_checks",
"CheckReport",
]

View File

@@ -0,0 +1,81 @@
"""
C3 — Structural coherence.
Uses graph analysis to check that the entity relationship graph is
well-connected and has meaningful community structure.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from markitect.prompts.dependencies.models import DependencyGraph
@dataclass
class CoherenceReport:
"""Results from coherence analysis."""
connected_components: int = 0
largest_component_size: int = 0
modularity: float = 0.0
community_count: int = 0
cohesion: float = 0.0
coupling: float = 0.0
entity_count: int = 0
def to_dict(self) -> dict:
return {
"concern": "C3",
"connected_components": self.connected_components,
"largest_component_size": self.largest_component_size,
"modularity": round(self.modularity, 4),
"community_count": self.community_count,
"cohesion": round(self.cohesion, 4),
"coupling": round(self.coupling, 4),
"entity_count": self.entity_count,
}
def check_coherence(
graph: Optional[DependencyGraph] = None,
entity_count: int = 0,
) -> CoherenceReport:
"""Check structural coherence of the entity relationship graph.
Args:
graph: The entity relationship graph. If ``None``, returns
a report with zero values.
entity_count: Total number of entities (for context).
Returns:
:class:`CoherenceReport` with connectivity and community metrics.
"""
if graph is None or len(graph.nodes) == 0:
return CoherenceReport(entity_count=entity_count)
try:
from markitect.analysis.graph import (
connected_components,
modularity_score,
detect_communities,
cohesion_coupling,
)
except ImportError:
return CoherenceReport(entity_count=entity_count)
components = connected_components(graph)
communities = detect_communities(graph, seed=42)
mod = modularity_score(graph, communities=communities)
cc = cohesion_coupling(graph, communities=communities)
return CoherenceReport(
connected_components=len(components),
largest_component_size=len(components[0]) if components else 0,
modularity=mod,
community_count=len(communities),
cohesion=cc["cohesion"],
coupling=cc["coupling"],
entity_count=entity_count or len(graph.nodes),
)

View File

@@ -0,0 +1,58 @@
"""
C4 — Definitional consistency.
Checks for cycles in the dependency graph and definitional conflicts
between entities.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from markitect.infospace.models import EntityMeta
from markitect.prompts.dependencies.models import DependencyGraph
@dataclass
class ConsistencyReport:
"""Results from consistency analysis."""
cycles: List[List[str]] = field(default_factory=list)
cycle_count: int = 0
entity_count: int = 0
def to_dict(self) -> dict:
return {
"concern": "C4",
"cycle_count": self.cycle_count,
"cycles": self.cycles,
"entity_count": self.entity_count,
}
def check_consistency(
entities: List[EntityMeta],
graph: Optional[DependencyGraph] = None,
) -> ConsistencyReport:
"""Check definitional consistency.
Args:
entities: Entity metadata list.
graph: Optional dependency graph for cycle detection.
Returns:
:class:`ConsistencyReport` with cycles found.
"""
n = len(entities)
cycles: List[List[str]] = []
if graph is not None and len(graph.nodes) > 0:
raw_cycles = graph.detect_cycles()
cycles = raw_cycles
return ConsistencyReport(
cycles=cycles,
cycle_count=len(cycles),
entity_count=n,
)

View File

@@ -0,0 +1,111 @@
"""
C2 — Coverage completeness.
Uses FCA and cross-tabulation to detect structural coverage gaps:
attribute combinations (domain × VSM system) with no entities.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from markitect.infospace.models import EntityMeta
from markitect.analysis.fca import FormalContext, find_empty_cells, find_gap_concepts
@dataclass
class CoverageReport:
"""Results from coverage analysis."""
coverage_ratio: float = 0.0
empty_cells: List[dict] = field(default_factory=list)
gap_concepts: List[dict] = field(default_factory=list)
domain_counts: Dict[str, int] = field(default_factory=dict)
entity_count: int = 0
def to_dict(self) -> dict:
return {
"concern": "C2",
"coverage_ratio": round(self.coverage_ratio, 4),
"empty_cells": self.empty_cells,
"gap_concepts_count": len(self.gap_concepts),
"domain_counts": self.domain_counts,
"entity_count": self.entity_count,
}
def _extract_attributes(entity: EntityMeta) -> set[str]:
"""Extract FCA attributes from an entity."""
attrs: set[str] = set()
if entity.domain:
attrs.add(f"domain:{entity.domain}")
if entity.source_chapter:
attrs.add(f"chapter:{entity.source_chapter}")
return attrs
def check_coverage(
entities: List[EntityMeta],
extra_attributes: Optional[Dict[str, set[str]]] = None,
) -> CoverageReport:
"""Check coverage completeness using FCA gap analysis.
Args:
entities: Entity metadata list.
extra_attributes: Optional ``{slug: {attr, ...}}`` to merge
with auto-extracted attributes (e.g. VSM mappings).
Returns:
:class:`CoverageReport` with gaps and coverage ratio.
"""
n = len(entities)
if n == 0:
return CoverageReport()
# Build entity → attributes mapping
entity_attrs: Dict[str, set[str]] = {}
for e in entities:
attrs = _extract_attributes(e)
if extra_attributes and e.slug in extra_attributes:
attrs.update(extra_attributes[e.slug])
entity_attrs[e.slug] = attrs
# Domain counts
domain_counts: Dict[str, int] = {}
for e in entities:
d = e.domain or "(unspecified)"
domain_counts[d] = domain_counts.get(d, 0) + 1
# Build FCA context
context = FormalContext.from_dict(entity_attrs)
# Cross-tabulation: domain × chapter
domains = sorted({a for attrs in entity_attrs.values() for a in attrs if a.startswith("domain:")})
chapters = sorted({a for attrs in entity_attrs.values() for a in attrs if a.startswith("chapter:")})
empty = []
if domains and chapters:
raw_empty = find_empty_cells(context, domains, chapters)
empty = [{"dimension_a": a, "dimension_b": b} for a, b in raw_empty]
# FCA gap concepts
gaps = find_gap_concepts(context)
gap_dicts = [
{"intent": sorted(g.intent), "extent_size": g.extent_size}
for g in gaps
if g.intent_size <= 4 # Only report manageable gaps
]
# Coverage ratio: populated cells / total possible cells
total_cells = len(domains) * len(chapters) if domains and chapters else 1
populated = total_cells - len(empty)
ratio = populated / total_cells if total_cells > 0 else 0.0
return CoverageReport(
coverage_ratio=ratio,
empty_cells=empty,
gap_concepts=gap_dicts,
domain_counts=domain_counts,
entity_count=n,
)

View File

@@ -0,0 +1,98 @@
"""
C5 — Granularity balance.
Checks that entities are at a consistent level of abstraction,
measured by word count distribution and Shannon entropy of domain
assignments.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from typing import Dict, List
from markitect.infospace.models import EntityMeta
@dataclass
class GranularityReport:
"""Results from granularity analysis."""
domain_entropy: float = 0.0
word_count_stats: Dict[str, float] = field(default_factory=dict)
domain_distribution: Dict[str, int] = field(default_factory=dict)
entity_count: int = 0
def to_dict(self) -> dict:
return {
"concern": "C5",
"domain_entropy": round(self.domain_entropy, 4),
"word_count_stats": {
k: round(v, 2) for k, v in self.word_count_stats.items()
},
"domain_distribution": self.domain_distribution,
"entity_count": self.entity_count,
}
def _shannon_entropy(counts: Dict[str, int]) -> float:
"""Compute Shannon entropy of a distribution."""
total = sum(counts.values())
if total == 0:
return 0.0
entropy = 0.0
for count in counts.values():
if count > 0:
p = count / total
entropy -= p * math.log2(p)
return entropy
def check_granularity(entities: List[EntityMeta]) -> GranularityReport:
"""Check granularity balance across entities.
Metrics:
- Domain entropy: higher = more balanced distribution.
- Word count statistics: mean, min, max, std dev.
Args:
entities: Entity metadata list.
Returns:
:class:`GranularityReport` with balance metrics.
"""
n = len(entities)
if n == 0:
return GranularityReport()
# Domain distribution
domain_counts: Dict[str, int] = {}
for e in entities:
d = e.domain or "(unspecified)"
domain_counts[d] = domain_counts.get(d, 0) + 1
entropy = _shannon_entropy(domain_counts)
# Word count statistics
word_counts = [e.definition_word_count for e in entities]
if not word_counts:
word_counts = [0]
mean_wc = sum(word_counts) / len(word_counts)
min_wc = min(word_counts)
max_wc = max(word_counts)
variance = sum((wc - mean_wc) ** 2 for wc in word_counts) / len(word_counts)
std_wc = math.sqrt(variance)
return GranularityReport(
domain_entropy=entropy,
word_count_stats={
"mean": mean_wc,
"min": float(min_wc),
"max": float(max_wc),
"std": std_wc,
},
domain_distribution=domain_counts,
entity_count=n,
)

View File

@@ -0,0 +1,102 @@
"""
Unified orchestrator for all five collection-level checks.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from markitect.infospace.models import EntityMeta
from markitect.prompts.dependencies.models import DependencyGraph
from .redundancy import RedundancyReport, check_redundancy
from .coverage import CoverageReport, check_coverage
from .coherence import CoherenceReport, check_coherence
from .consistency import ConsistencyReport, check_consistency
from .granularity import GranularityReport, check_granularity
@dataclass
class CheckReport:
"""Unified report from all five collection-level checks."""
redundancy: Optional[RedundancyReport] = None
coverage: Optional[CoverageReport] = None
coherence: Optional[CoherenceReport] = None
consistency: Optional[ConsistencyReport] = None
granularity: Optional[GranularityReport] = None
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {}
if self.redundancy:
d["redundancy"] = self.redundancy.to_dict()
if self.coverage:
d["coverage"] = self.coverage.to_dict()
if self.coherence:
d["coherence"] = self.coherence.to_dict()
if self.consistency:
d["consistency"] = self.consistency.to_dict()
if self.granularity:
d["granularity"] = self.granularity.to_dict()
return d
def metrics(self) -> Dict[str, float]:
"""Extract key metrics for viability checking."""
m: Dict[str, float] = {}
if self.redundancy:
m["redundancy_ratio"] = self.redundancy.redundancy_ratio
if self.coverage:
m["coverage_ratio"] = self.coverage.coverage_ratio
if self.coherence:
m["coherence_components"] = float(self.coherence.connected_components)
m["modularity"] = self.coherence.modularity
if self.consistency:
m["consistency_cycles"] = float(self.consistency.cycle_count)
if self.granularity:
m["granularity_entropy"] = self.granularity.domain_entropy
return m
def run_all_checks(
entities: List[EntityMeta],
embeddings: Optional[Dict[str, list[float]]] = None,
graph: Optional[DependencyGraph] = None,
extra_attributes: Optional[Dict[str, set[str]]] = None,
checks: Optional[List[str]] = None,
) -> CheckReport:
"""Run all (or selected) collection-level checks.
Args:
entities: Entity metadata list.
embeddings: Pre-computed embedding vectors for C1.
graph: Entity relationship graph for C3 and C4.
extra_attributes: Extra FCA attributes for C2.
checks: List of check names to run. If ``None``, runs all five.
Valid names: ``redundancy``, ``coverage``, ``coherence``,
``consistency``, ``granularity``.
Returns:
:class:`CheckReport` with results from each check.
"""
run_all = checks is None
check_set = set(checks) if checks else set()
report = CheckReport()
if run_all or "redundancy" in check_set:
report.redundancy = check_redundancy(entities, embeddings=embeddings)
if run_all or "coverage" in check_set:
report.coverage = check_coverage(entities, extra_attributes=extra_attributes)
if run_all or "coherence" in check_set:
report.coherence = check_coherence(graph=graph, entity_count=len(entities))
if run_all or "consistency" in check_set:
report.consistency = check_consistency(entities, graph=graph)
if run_all or "granularity" in check_set:
report.granularity = check_granularity(entities)
return report

View File

@@ -0,0 +1,98 @@
"""
C1 — Redundancy detection.
Uses embedding similarity to find entity pairs with overlapping
meanings that may be candidates for merging.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from markitect.infospace.models import EntityMeta
from markitect.llm.similarity import find_similar_pairs
@dataclass
class RedundancyReport:
"""Results from redundancy analysis."""
similar_pairs: List[dict] = field(default_factory=list)
redundancy_ratio: float = 0.0
entity_count: int = 0
def to_dict(self) -> dict:
return {
"concern": "C1",
"redundancy_ratio": round(self.redundancy_ratio, 4),
"similar_pairs": self.similar_pairs,
"entity_count": self.entity_count,
}
def check_redundancy(
entities: List[EntityMeta],
embeddings: Optional[Dict[str, list[float]]] = None,
threshold: float = 0.85,
) -> RedundancyReport:
"""Check for redundant entities using embedding similarity.
Args:
entities: Entity metadata list.
embeddings: Pre-computed ``{slug: vector}`` mapping.
If ``None``, redundancy is checked structurally (title overlap).
threshold: Similarity threshold for flagging pairs.
Returns:
:class:`RedundancyReport` with similar pairs and ratio.
"""
n = len(entities)
if n < 2:
return RedundancyReport(entity_count=n)
pairs: list[dict] = []
if embeddings:
# Embedding-based similarity
raw_pairs = find_similar_pairs(embeddings, threshold=threshold)
for slug_a, slug_b, sim in raw_pairs:
pairs.append({
"entity_a": slug_a,
"entity_b": slug_b,
"similarity": round(sim, 4),
"method": "embedding",
})
else:
# Fallback: structural overlap (shared definition words)
slug_to_words = {}
for e in entities:
words = set(e.definition.lower().split()) if e.definition else set()
slug_to_words[e.slug] = words
slugs = sorted(slug_to_words)
for i, a in enumerate(slugs):
for b in slugs[i + 1:]:
wa, wb = slug_to_words[a], slug_to_words[b]
if wa and wb:
overlap = len(wa & wb) / min(len(wa), len(wb))
if overlap >= threshold:
pairs.append({
"entity_a": a,
"entity_b": b,
"similarity": round(overlap, 4),
"method": "word_overlap",
})
# redundancy_ratio: fraction of entities involved in similar pairs
involved = set()
for p in pairs:
involved.add(p["entity_a"])
involved.add(p["entity_b"])
ratio = len(involved) / n if n > 0 else 0.0
return RedundancyReport(
similar_pairs=pairs,
redundancy_ratio=ratio,
entity_count=n,
)

View File

@@ -273,3 +273,61 @@ def viability(config_path: Optional[str]):
click.echo(f"Viable: YES ({state.viability_pass_count}/{state.viability_total_count} thresholds met)")
else:
click.echo(f"Viable: NO ({state.viability_pass_count}/{state.viability_total_count} thresholds met)")
# ── check ───────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--concern", "concerns", multiple=True,
type=click.Choice(["redundancy", "coverage", "coherence", "consistency", "granularity"]),
help="Run specific concern(s). Omit to run all five.",
)
@click.option("--json", "as_json", is_flag=True, help="Output results as JSON.")
def check(config_path: Optional[str], concerns: tuple, as_json: bool):
"""Run collection-level quality checks (C1C5)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("Error: No entities directory found.", err=True)
raise SystemExit(1)
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities to check.")
return
from markitect.infospace.checks import run_all_checks
checks_list = list(concerns) if concerns else None
report = run_all_checks(
entities=entity_list,
checks=checks_list,
)
if as_json:
import json
click.echo(json.dumps(report.to_dict(), indent=2))
else:
click.echo(f"Collection checks — {len(entity_list)} entities\n")
d = report.to_dict()
for concern_name, concern_data in d.items():
label = concern_data.get("concern", concern_name.upper())
click.echo(f" {label}{concern_name}")
for k, v in concern_data.items():
if k == "concern":
continue
click.echo(f" {k}: {v}")
click.echo()
# Show summary metrics
m = report.metrics()
if m and not as_json:
click.echo("Metrics summary:")
for k, v in sorted(m.items()):
click.echo(f" {k}: {v:.4f}")

View File

@@ -0,0 +1,413 @@
"""
Tests for collection-level quality checks (S2.4).
Covers all five concerns: Redundancy (C1), Coverage (C2), Coherence (C3),
Consistency (C4), Granularity (C5), and the orchestrator.
"""
from __future__ import annotations
import math
import pytest
from markitect.infospace.models import EntityMeta
from markitect.prompts.dependencies.models import DependencyGraph
# ── helpers ──────────────────────────────────────────────────────────
def _entity(slug: str, domain: str = "", definition: str = "",
source_chapter: str = "", word_count: int = 0) -> EntityMeta:
wc = word_count if word_count else (len(definition.split()) if definition else 0)
return EntityMeta(
slug=slug,
title=slug.replace("-", " ").title(),
h1_raw=slug.replace("-", " ").title(),
definition=definition,
domain=domain,
source_chapter=source_chapter,
definition_word_count=wc,
total_word_count=wc,
)
def _sample_entities() -> list[EntityMeta]:
return [
_entity("alpha", domain="economics", definition="the first concept in our model", source_chapter="ch01"),
_entity("beta", domain="economics", definition="the second concept about markets", source_chapter="ch01"),
_entity("gamma", domain="sociology", definition="a social structure framework", source_chapter="ch02"),
_entity("delta", domain="sociology", definition="a social dynamic pattern", source_chapter="ch02"),
_entity("epsilon", domain="philosophy", definition="an epistemic principle", source_chapter="ch03"),
]
def _linear_graph() -> DependencyGraph:
"""A -> B -> C -> D."""
g = DependencyGraph()
g.add_edge("A", "B")
g.add_edge("B", "C")
g.add_edge("C", "D")
return g
def _cyclic_graph() -> DependencyGraph:
"""A -> B -> C -> A (one cycle)."""
g = DependencyGraph()
g.add_edge("A", "B")
g.add_edge("B", "C")
g.add_edge("C", "A")
return g
def _can_import_graph_analysis():
try:
from markitect.analysis.graph import connected_components # noqa: F401
return True
except ImportError:
return False
# ── C1: Redundancy ──────────────────────────────────────────────────
class TestRedundancy:
def test_empty_entities(self):
from markitect.infospace.checks.redundancy import check_redundancy
report = check_redundancy([])
assert report.entity_count == 0
assert report.redundancy_ratio == 0.0
assert report.similar_pairs == []
def test_single_entity(self):
from markitect.infospace.checks.redundancy import check_redundancy
report = check_redundancy([_entity("a", definition="hello world")])
assert report.entity_count == 1
assert report.redundancy_ratio == 0.0
def test_no_overlap_word_fallback(self):
from markitect.infospace.checks.redundancy import check_redundancy
entities = [
_entity("a", definition="apple banana cherry"),
_entity("b", definition="delta epsilon zeta"),
]
report = check_redundancy(entities, threshold=0.5)
assert report.similar_pairs == []
assert report.redundancy_ratio == 0.0
def test_high_overlap_word_fallback(self):
from markitect.infospace.checks.redundancy import check_redundancy
entities = [
_entity("a", definition="the quick brown fox"),
_entity("b", definition="the quick brown dog"),
]
report = check_redundancy(entities, threshold=0.5)
assert len(report.similar_pairs) == 1
assert report.similar_pairs[0]["method"] == "word_overlap"
assert report.similar_pairs[0]["entity_a"] == "a"
assert report.similar_pairs[0]["entity_b"] == "b"
assert report.redundancy_ratio == 1.0 # both entities involved
def test_embedding_based(self):
from markitect.infospace.checks.redundancy import check_redundancy
entities = [
_entity("a", definition="x"),
_entity("b", definition="y"),
_entity("c", definition="z"),
]
# a and b are very similar; c is different
embeddings = {
"a": [1.0, 0.0, 0.0],
"b": [0.99, 0.1, 0.0],
"c": [0.0, 0.0, 1.0],
}
report = check_redundancy(entities, embeddings=embeddings, threshold=0.9)
assert len(report.similar_pairs) >= 1
assert report.similar_pairs[0]["method"] == "embedding"
assert report.redundancy_ratio > 0.0
def test_to_dict(self):
from markitect.infospace.checks.redundancy import RedundancyReport
r = RedundancyReport(similar_pairs=[], redundancy_ratio=0.25, entity_count=10)
d = r.to_dict()
assert d["concern"] == "C1"
assert d["redundancy_ratio"] == 0.25
assert d["entity_count"] == 10
# ── C2: Coverage ────────────────────────────────────────────────────
class TestCoverage:
def test_empty_entities(self):
from markitect.infospace.checks.coverage import check_coverage
report = check_coverage([])
assert report.entity_count == 0
assert report.coverage_ratio == 0.0
def test_full_coverage(self):
"""All domain×chapter cells are populated."""
from markitect.infospace.checks.coverage import check_coverage
entities = [
_entity("a", domain="d1", source_chapter="ch1"),
_entity("b", domain="d2", source_chapter="ch1"),
_entity("c", domain="d1", source_chapter="ch2"),
_entity("d", domain="d2", source_chapter="ch2"),
]
report = check_coverage(entities)
assert report.coverage_ratio == 1.0
assert report.empty_cells == []
def test_partial_coverage(self):
"""One cell is missing → coverage < 1.0."""
from markitect.infospace.checks.coverage import check_coverage
entities = [
_entity("a", domain="d1", source_chapter="ch1"),
_entity("b", domain="d2", source_chapter="ch1"),
_entity("c", domain="d1", source_chapter="ch2"),
# Missing: d2×ch2
]
report = check_coverage(entities)
assert report.coverage_ratio < 1.0
assert len(report.empty_cells) == 1
assert report.empty_cells[0]["dimension_a"] == "domain:d2"
assert report.empty_cells[0]["dimension_b"] == "chapter:ch2"
def test_domain_counts(self):
from markitect.infospace.checks.coverage import check_coverage
entities = _sample_entities()
report = check_coverage(entities)
assert report.domain_counts["economics"] == 2
assert report.domain_counts["sociology"] == 2
assert report.domain_counts["philosophy"] == 1
def test_to_dict(self):
from markitect.infospace.checks.coverage import CoverageReport
r = CoverageReport(coverage_ratio=0.75, entity_count=8)
d = r.to_dict()
assert d["concern"] == "C2"
assert d["coverage_ratio"] == 0.75
def test_extra_attributes(self):
from markitect.infospace.checks.coverage import check_coverage
entities = [
_entity("a", domain="d1", source_chapter="ch1"),
]
extra = {"a": {"vsm:production"}}
report = check_coverage(entities, extra_attributes=extra)
assert report.entity_count == 1
# ── C3: Coherence ───────────────────────────────────────────────────
class TestCoherence:
def test_no_graph(self):
from markitect.infospace.checks.coherence import check_coherence
report = check_coherence(graph=None, entity_count=5)
assert report.connected_components == 0
assert report.entity_count == 5
def test_empty_graph(self):
from markitect.infospace.checks.coherence import check_coherence
g = DependencyGraph()
report = check_coherence(graph=g, entity_count=0)
assert report.connected_components == 0
def test_to_dict(self):
from markitect.infospace.checks.coherence import CoherenceReport
r = CoherenceReport(connected_components=2, modularity=0.3456, entity_count=10)
d = r.to_dict()
assert d["concern"] == "C3"
assert d["modularity"] == 0.3456
assert d["connected_components"] == 2
@pytest.mark.skipif(
not _can_import_graph_analysis(),
reason="networkx not available",
)
def test_with_graph(self):
from markitect.infospace.checks.coherence import check_coherence
g = _linear_graph()
report = check_coherence(graph=g, entity_count=4)
assert report.connected_components >= 1
assert report.entity_count == 4
# ── C4: Consistency ─────────────────────────────────────────────────
class TestConsistency:
def test_no_graph(self):
from markitect.infospace.checks.consistency import check_consistency
entities = _sample_entities()
report = check_consistency(entities)
assert report.cycle_count == 0
assert report.entity_count == 5
def test_acyclic_graph(self):
from markitect.infospace.checks.consistency import check_consistency
entities = _sample_entities()
g = _linear_graph()
report = check_consistency(entities, graph=g)
assert report.cycle_count == 0
def test_cyclic_graph(self):
from markitect.infospace.checks.consistency import check_consistency
entities = _sample_entities()
g = _cyclic_graph()
report = check_consistency(entities, graph=g)
assert report.cycle_count >= 1
assert len(report.cycles) >= 1
def test_to_dict(self):
from markitect.infospace.checks.consistency import ConsistencyReport
r = ConsistencyReport(cycles=[["A", "B", "A"]], cycle_count=1, entity_count=5)
d = r.to_dict()
assert d["concern"] == "C4"
assert d["cycle_count"] == 1
# ── C5: Granularity ─────────────────────────────────────────────────
class TestGranularity:
def test_empty_entities(self):
from markitect.infospace.checks.granularity import check_granularity
report = check_granularity([])
assert report.entity_count == 0
assert report.domain_entropy == 0.0
def test_single_domain(self):
from markitect.infospace.checks.granularity import check_granularity
entities = [
_entity("a", domain="d1", word_count=10),
_entity("b", domain="d1", word_count=20),
]
report = check_granularity(entities)
assert report.domain_entropy == 0.0 # single domain = zero entropy
assert report.entity_count == 2
assert report.word_count_stats["mean"] == 15.0
def test_balanced_domains(self):
from markitect.infospace.checks.granularity import check_granularity
entities = [
_entity("a", domain="d1", word_count=10),
_entity("b", domain="d2", word_count=10),
]
report = check_granularity(entities)
assert report.domain_entropy == pytest.approx(1.0) # log2(2) = 1.0
assert report.domain_distribution == {"d1": 1, "d2": 1}
def test_word_count_stats(self):
from markitect.infospace.checks.granularity import check_granularity
entities = [
_entity("a", domain="d1", word_count=10),
_entity("b", domain="d1", word_count=30),
]
report = check_granularity(entities)
assert report.word_count_stats["mean"] == 20.0
assert report.word_count_stats["min"] == 10.0
assert report.word_count_stats["max"] == 30.0
assert report.word_count_stats["std"] == 10.0
def test_to_dict(self):
from markitect.infospace.checks.granularity import GranularityReport
r = GranularityReport(domain_entropy=1.5, entity_count=4)
d = r.to_dict()
assert d["concern"] == "C5"
assert d["domain_entropy"] == 1.5
def test_unspecified_domain(self):
from markitect.infospace.checks.granularity import check_granularity
entities = [_entity("a", domain="", word_count=10)]
report = check_granularity(entities)
assert "(unspecified)" in report.domain_distribution
# ── Orchestrator ────────────────────────────────────────────────────
class TestOrchestrator:
def test_run_all_default(self):
from markitect.infospace.checks.orchestrator import run_all_checks
entities = _sample_entities()
report = run_all_checks(entities)
assert report.redundancy is not None
assert report.coverage is not None
assert report.coherence is not None
assert report.consistency is not None
assert report.granularity is not None
def test_run_selected_checks(self):
from markitect.infospace.checks.orchestrator import run_all_checks
entities = _sample_entities()
report = run_all_checks(entities, checks=["redundancy", "granularity"])
assert report.redundancy is not None
assert report.granularity is not None
assert report.coverage is None
assert report.coherence is None
assert report.consistency is None
def test_to_dict(self):
from markitect.infospace.checks.orchestrator import run_all_checks
entities = _sample_entities()
report = run_all_checks(entities, checks=["granularity"])
d = report.to_dict()
assert "granularity" in d
assert "redundancy" not in d
def test_metrics(self):
from markitect.infospace.checks.orchestrator import run_all_checks
entities = _sample_entities()
report = run_all_checks(entities, checks=["redundancy", "granularity"])
m = report.metrics()
assert "redundancy_ratio" in m
assert "granularity_entropy" in m
assert isinstance(m["redundancy_ratio"], float)
assert isinstance(m["granularity_entropy"], float)
def test_metrics_empty_report(self):
from markitect.infospace.checks.orchestrator import CheckReport
report = CheckReport()
assert report.metrics() == {}
def test_run_all_with_graph(self):
from markitect.infospace.checks.orchestrator import run_all_checks
entities = _sample_entities()
g = _linear_graph()
report = run_all_checks(entities, graph=g, checks=["consistency"])
assert report.consistency is not None
assert report.consistency.cycle_count == 0
def test_run_all_with_cyclic_graph(self):
from markitect.infospace.checks.orchestrator import run_all_checks
entities = _sample_entities()
g = _cyclic_graph()
report = run_all_checks(entities, graph=g, checks=["consistency"])
assert report.consistency.cycle_count >= 1
# ── Shannon entropy helper ──────────────────────────────────────────
class TestShannonEntropy:
def test_uniform_distribution(self):
from markitect.infospace.checks.granularity import _shannon_entropy
counts = {"a": 1, "b": 1, "c": 1, "d": 1}
assert _shannon_entropy(counts) == pytest.approx(2.0) # log2(4)
def test_single_element(self):
from markitect.infospace.checks.granularity import _shannon_entropy
assert _shannon_entropy({"a": 10}) == 0.0
def test_empty(self):
from markitect.infospace.checks.granularity import _shannon_entropy
assert _shannon_entropy({}) == 0.0
def test_skewed(self):
from markitect.infospace.checks.granularity import _shannon_entropy
counts = {"a": 99, "b": 1}
entropy = _shannon_entropy(counts)
assert 0.0 < entropy < 1.0