diff --git a/markitect/infospace/checks/__init__.py b/markitect/infospace/checks/__init__.py new file mode 100644 index 00000000..3ec9625e --- /dev/null +++ b/markitect/infospace/checks/__init__.py @@ -0,0 +1,23 @@ +""" +Collection-level quality checks for infospaces. + +Five concerns: Redundancy (C1), Coverage (C2), Coherence (C3), +Consistency (C4), Granularity (C5). +""" + +from markitect.infospace.checks.redundancy import check_redundancy +from markitect.infospace.checks.coverage import check_coverage +from markitect.infospace.checks.coherence import check_coherence +from markitect.infospace.checks.consistency import check_consistency +from markitect.infospace.checks.granularity import check_granularity +from markitect.infospace.checks.orchestrator import run_all_checks, CheckReport + +__all__ = [ + "check_redundancy", + "check_coverage", + "check_coherence", + "check_consistency", + "check_granularity", + "run_all_checks", + "CheckReport", +] diff --git a/markitect/infospace/checks/coherence.py b/markitect/infospace/checks/coherence.py new file mode 100644 index 00000000..ffce5691 --- /dev/null +++ b/markitect/infospace/checks/coherence.py @@ -0,0 +1,81 @@ +""" +C3 — Structural coherence. + +Uses graph analysis to check that the entity relationship graph is +well-connected and has meaningful community structure. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from markitect.prompts.dependencies.models import DependencyGraph + + +@dataclass +class CoherenceReport: + """Results from coherence analysis.""" + + connected_components: int = 0 + largest_component_size: int = 0 + modularity: float = 0.0 + community_count: int = 0 + cohesion: float = 0.0 + coupling: float = 0.0 + entity_count: int = 0 + + def to_dict(self) -> dict: + return { + "concern": "C3", + "connected_components": self.connected_components, + "largest_component_size": self.largest_component_size, + "modularity": round(self.modularity, 4), + "community_count": self.community_count, + "cohesion": round(self.cohesion, 4), + "coupling": round(self.coupling, 4), + "entity_count": self.entity_count, + } + + +def check_coherence( + graph: Optional[DependencyGraph] = None, + entity_count: int = 0, +) -> CoherenceReport: + """Check structural coherence of the entity relationship graph. + + Args: + graph: The entity relationship graph. If ``None``, returns + a report with zero values. + entity_count: Total number of entities (for context). + + Returns: + :class:`CoherenceReport` with connectivity and community metrics. + """ + if graph is None or len(graph.nodes) == 0: + return CoherenceReport(entity_count=entity_count) + + try: + from markitect.analysis.graph import ( + connected_components, + modularity_score, + detect_communities, + cohesion_coupling, + ) + except ImportError: + return CoherenceReport(entity_count=entity_count) + + components = connected_components(graph) + communities = detect_communities(graph, seed=42) + mod = modularity_score(graph, communities=communities) + cc = cohesion_coupling(graph, communities=communities) + + return CoherenceReport( + connected_components=len(components), + largest_component_size=len(components[0]) if components else 0, + modularity=mod, + community_count=len(communities), + cohesion=cc["cohesion"], + coupling=cc["coupling"], + entity_count=entity_count or len(graph.nodes), + ) diff --git a/markitect/infospace/checks/consistency.py b/markitect/infospace/checks/consistency.py new file mode 100644 index 00000000..50f91522 --- /dev/null +++ b/markitect/infospace/checks/consistency.py @@ -0,0 +1,58 @@ +""" +C4 — Definitional consistency. + +Checks for cycles in the dependency graph and definitional conflicts +between entities. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from markitect.infospace.models import EntityMeta +from markitect.prompts.dependencies.models import DependencyGraph + + +@dataclass +class ConsistencyReport: + """Results from consistency analysis.""" + + cycles: List[List[str]] = field(default_factory=list) + cycle_count: int = 0 + entity_count: int = 0 + + def to_dict(self) -> dict: + return { + "concern": "C4", + "cycle_count": self.cycle_count, + "cycles": self.cycles, + "entity_count": self.entity_count, + } + + +def check_consistency( + entities: List[EntityMeta], + graph: Optional[DependencyGraph] = None, +) -> ConsistencyReport: + """Check definitional consistency. + + Args: + entities: Entity metadata list. + graph: Optional dependency graph for cycle detection. + + Returns: + :class:`ConsistencyReport` with cycles found. + """ + n = len(entities) + cycles: List[List[str]] = [] + + if graph is not None and len(graph.nodes) > 0: + raw_cycles = graph.detect_cycles() + cycles = raw_cycles + + return ConsistencyReport( + cycles=cycles, + cycle_count=len(cycles), + entity_count=n, + ) diff --git a/markitect/infospace/checks/coverage.py b/markitect/infospace/checks/coverage.py new file mode 100644 index 00000000..887ffcbb --- /dev/null +++ b/markitect/infospace/checks/coverage.py @@ -0,0 +1,111 @@ +""" +C2 — Coverage completeness. + +Uses FCA and cross-tabulation to detect structural coverage gaps: +attribute combinations (domain × VSM system) with no entities. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +from markitect.infospace.models import EntityMeta +from markitect.analysis.fca import FormalContext, find_empty_cells, find_gap_concepts + + +@dataclass +class CoverageReport: + """Results from coverage analysis.""" + + coverage_ratio: float = 0.0 + empty_cells: List[dict] = field(default_factory=list) + gap_concepts: List[dict] = field(default_factory=list) + domain_counts: Dict[str, int] = field(default_factory=dict) + entity_count: int = 0 + + def to_dict(self) -> dict: + return { + "concern": "C2", + "coverage_ratio": round(self.coverage_ratio, 4), + "empty_cells": self.empty_cells, + "gap_concepts_count": len(self.gap_concepts), + "domain_counts": self.domain_counts, + "entity_count": self.entity_count, + } + + +def _extract_attributes(entity: EntityMeta) -> set[str]: + """Extract FCA attributes from an entity.""" + attrs: set[str] = set() + if entity.domain: + attrs.add(f"domain:{entity.domain}") + if entity.source_chapter: + attrs.add(f"chapter:{entity.source_chapter}") + return attrs + + +def check_coverage( + entities: List[EntityMeta], + extra_attributes: Optional[Dict[str, set[str]]] = None, +) -> CoverageReport: + """Check coverage completeness using FCA gap analysis. + + Args: + entities: Entity metadata list. + extra_attributes: Optional ``{slug: {attr, ...}}`` to merge + with auto-extracted attributes (e.g. VSM mappings). + + Returns: + :class:`CoverageReport` with gaps and coverage ratio. + """ + n = len(entities) + if n == 0: + return CoverageReport() + + # Build entity → attributes mapping + entity_attrs: Dict[str, set[str]] = {} + for e in entities: + attrs = _extract_attributes(e) + if extra_attributes and e.slug in extra_attributes: + attrs.update(extra_attributes[e.slug]) + entity_attrs[e.slug] = attrs + + # Domain counts + domain_counts: Dict[str, int] = {} + for e in entities: + d = e.domain or "(unspecified)" + domain_counts[d] = domain_counts.get(d, 0) + 1 + + # Build FCA context + context = FormalContext.from_dict(entity_attrs) + + # Cross-tabulation: domain × chapter + domains = sorted({a for attrs in entity_attrs.values() for a in attrs if a.startswith("domain:")}) + chapters = sorted({a for attrs in entity_attrs.values() for a in attrs if a.startswith("chapter:")}) + + empty = [] + if domains and chapters: + raw_empty = find_empty_cells(context, domains, chapters) + empty = [{"dimension_a": a, "dimension_b": b} for a, b in raw_empty] + + # FCA gap concepts + gaps = find_gap_concepts(context) + gap_dicts = [ + {"intent": sorted(g.intent), "extent_size": g.extent_size} + for g in gaps + if g.intent_size <= 4 # Only report manageable gaps + ] + + # Coverage ratio: populated cells / total possible cells + total_cells = len(domains) * len(chapters) if domains and chapters else 1 + populated = total_cells - len(empty) + ratio = populated / total_cells if total_cells > 0 else 0.0 + + return CoverageReport( + coverage_ratio=ratio, + empty_cells=empty, + gap_concepts=gap_dicts, + domain_counts=domain_counts, + entity_count=n, + ) diff --git a/markitect/infospace/checks/granularity.py b/markitect/infospace/checks/granularity.py new file mode 100644 index 00000000..9fc6092e --- /dev/null +++ b/markitect/infospace/checks/granularity.py @@ -0,0 +1,98 @@ +""" +C5 — Granularity balance. + +Checks that entities are at a consistent level of abstraction, +measured by word count distribution and Shannon entropy of domain +assignments. +""" + +from __future__ import annotations + +import math +from dataclasses import dataclass, field +from typing import Dict, List + +from markitect.infospace.models import EntityMeta + + +@dataclass +class GranularityReport: + """Results from granularity analysis.""" + + domain_entropy: float = 0.0 + word_count_stats: Dict[str, float] = field(default_factory=dict) + domain_distribution: Dict[str, int] = field(default_factory=dict) + entity_count: int = 0 + + def to_dict(self) -> dict: + return { + "concern": "C5", + "domain_entropy": round(self.domain_entropy, 4), + "word_count_stats": { + k: round(v, 2) for k, v in self.word_count_stats.items() + }, + "domain_distribution": self.domain_distribution, + "entity_count": self.entity_count, + } + + +def _shannon_entropy(counts: Dict[str, int]) -> float: + """Compute Shannon entropy of a distribution.""" + total = sum(counts.values()) + if total == 0: + return 0.0 + entropy = 0.0 + for count in counts.values(): + if count > 0: + p = count / total + entropy -= p * math.log2(p) + return entropy + + +def check_granularity(entities: List[EntityMeta]) -> GranularityReport: + """Check granularity balance across entities. + + Metrics: + - Domain entropy: higher = more balanced distribution. + - Word count statistics: mean, min, max, std dev. + + Args: + entities: Entity metadata list. + + Returns: + :class:`GranularityReport` with balance metrics. + """ + n = len(entities) + if n == 0: + return GranularityReport() + + # Domain distribution + domain_counts: Dict[str, int] = {} + for e in entities: + d = e.domain or "(unspecified)" + domain_counts[d] = domain_counts.get(d, 0) + 1 + + entropy = _shannon_entropy(domain_counts) + + # Word count statistics + word_counts = [e.definition_word_count for e in entities] + if not word_counts: + word_counts = [0] + + mean_wc = sum(word_counts) / len(word_counts) + min_wc = min(word_counts) + max_wc = max(word_counts) + variance = sum((wc - mean_wc) ** 2 for wc in word_counts) / len(word_counts) + std_wc = math.sqrt(variance) + + return GranularityReport( + domain_entropy=entropy, + word_count_stats={ + "mean": mean_wc, + "min": float(min_wc), + "max": float(max_wc), + "std": std_wc, + }, + domain_distribution=domain_counts, + entity_count=n, + ) diff --git a/markitect/infospace/checks/orchestrator.py b/markitect/infospace/checks/orchestrator.py new file mode 100644 index 00000000..789dc76b --- /dev/null +++ b/markitect/infospace/checks/orchestrator.py @@ -0,0 +1,102 @@ +""" +Unified orchestrator for all five collection-level checks. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +from markitect.infospace.models import EntityMeta +from markitect.prompts.dependencies.models import DependencyGraph + +from .redundancy import RedundancyReport, check_redundancy +from .coverage import CoverageReport, check_coverage +from .coherence import CoherenceReport, check_coherence +from .consistency import ConsistencyReport, check_consistency +from .granularity import GranularityReport, check_granularity + + +@dataclass +class CheckReport: + """Unified report from all five collection-level checks.""" + + redundancy: Optional[RedundancyReport] = None + coverage: Optional[CoverageReport] = None + coherence: Optional[CoherenceReport] = None + consistency: Optional[ConsistencyReport] = None + granularity: Optional[GranularityReport] = None + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = {} + if self.redundancy: + d["redundancy"] = self.redundancy.to_dict() + if self.coverage: + d["coverage"] = self.coverage.to_dict() + if self.coherence: + d["coherence"] = self.coherence.to_dict() + if self.consistency: + d["consistency"] = self.consistency.to_dict() + if self.granularity: + d["granularity"] = self.granularity.to_dict() + return d + + def metrics(self) -> Dict[str, float]: + """Extract key metrics for viability checking.""" + m: Dict[str, float] = {} + if self.redundancy: + m["redundancy_ratio"] = self.redundancy.redundancy_ratio + if self.coverage: + m["coverage_ratio"] = self.coverage.coverage_ratio + if self.coherence: + m["coherence_components"] = float(self.coherence.connected_components) + m["modularity"] = self.coherence.modularity + if self.consistency: + m["consistency_cycles"] = float(self.consistency.cycle_count) + if self.granularity: + m["granularity_entropy"] = self.granularity.domain_entropy + return m + + +def run_all_checks( + entities: List[EntityMeta], + embeddings: Optional[Dict[str, list[float]]] = None, + graph: Optional[DependencyGraph] = None, + extra_attributes: Optional[Dict[str, set[str]]] = None, + checks: Optional[List[str]] = None, +) -> CheckReport: + """Run all (or selected) collection-level checks. + + Args: + entities: Entity metadata list. + embeddings: Pre-computed embedding vectors for C1. + graph: Entity relationship graph for C3 and C4. + extra_attributes: Extra FCA attributes for C2. + checks: List of check names to run. If ``None``, runs all five. + Valid names: ``redundancy``, ``coverage``, ``coherence``, + ``consistency``, ``granularity``. + + Returns: + :class:`CheckReport` with results from each check. + """ + run_all = checks is None + check_set = set(checks) if checks else set() + + report = CheckReport() + + if run_all or "redundancy" in check_set: + report.redundancy = check_redundancy(entities, embeddings=embeddings) + + if run_all or "coverage" in check_set: + report.coverage = check_coverage(entities, extra_attributes=extra_attributes) + + if run_all or "coherence" in check_set: + report.coherence = check_coherence(graph=graph, entity_count=len(entities)) + + if run_all or "consistency" in check_set: + report.consistency = check_consistency(entities, graph=graph) + + if run_all or "granularity" in check_set: + report.granularity = check_granularity(entities) + + return report diff --git a/markitect/infospace/checks/redundancy.py b/markitect/infospace/checks/redundancy.py new file mode 100644 index 00000000..42abf304 --- /dev/null +++ b/markitect/infospace/checks/redundancy.py @@ -0,0 +1,98 @@ +""" +C1 — Redundancy detection. + +Uses embedding similarity to find entity pairs with overlapping +meanings that may be candidates for merging. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from markitect.infospace.models import EntityMeta +from markitect.llm.similarity import find_similar_pairs + + +@dataclass +class RedundancyReport: + """Results from redundancy analysis.""" + + similar_pairs: List[dict] = field(default_factory=list) + redundancy_ratio: float = 0.0 + entity_count: int = 0 + + def to_dict(self) -> dict: + return { + "concern": "C1", + "redundancy_ratio": round(self.redundancy_ratio, 4), + "similar_pairs": self.similar_pairs, + "entity_count": self.entity_count, + } + + +def check_redundancy( + entities: List[EntityMeta], + embeddings: Optional[Dict[str, list[float]]] = None, + threshold: float = 0.85, +) -> RedundancyReport: + """Check for redundant entities using embedding similarity. + + Args: + entities: Entity metadata list. + embeddings: Pre-computed ``{slug: vector}`` mapping. + If ``None``, redundancy is checked structurally (title overlap). + threshold: Similarity threshold for flagging pairs. + + Returns: + :class:`RedundancyReport` with similar pairs and ratio. + """ + n = len(entities) + if n < 2: + return RedundancyReport(entity_count=n) + + pairs: list[dict] = [] + + if embeddings: + # Embedding-based similarity + raw_pairs = find_similar_pairs(embeddings, threshold=threshold) + for slug_a, slug_b, sim in raw_pairs: + pairs.append({ + "entity_a": slug_a, + "entity_b": slug_b, + "similarity": round(sim, 4), + "method": "embedding", + }) + else: + # Fallback: structural overlap (shared definition words) + slug_to_words = {} + for e in entities: + words = set(e.definition.lower().split()) if e.definition else set() + slug_to_words[e.slug] = words + + slugs = sorted(slug_to_words) + for i, a in enumerate(slugs): + for b in slugs[i + 1:]: + wa, wb = slug_to_words[a], slug_to_words[b] + if wa and wb: + overlap = len(wa & wb) / min(len(wa), len(wb)) + if overlap >= threshold: + pairs.append({ + "entity_a": a, + "entity_b": b, + "similarity": round(overlap, 4), + "method": "word_overlap", + }) + + # redundancy_ratio: fraction of entities involved in similar pairs + involved = set() + for p in pairs: + involved.add(p["entity_a"]) + involved.add(p["entity_b"]) + ratio = len(involved) / n if n > 0 else 0.0 + + return RedundancyReport( + similar_pairs=pairs, + redundancy_ratio=ratio, + entity_count=n, + ) diff --git a/markitect/infospace/cli.py b/markitect/infospace/cli.py index c158187b..2af1d2dd 100644 --- a/markitect/infospace/cli.py +++ b/markitect/infospace/cli.py @@ -273,3 +273,61 @@ def viability(config_path: Optional[str]): click.echo(f"Viable: YES ({state.viability_pass_count}/{state.viability_total_count} thresholds met)") else: click.echo(f"Viable: NO ({state.viability_pass_count}/{state.viability_total_count} thresholds met)") + + +# ── check ─────────────────────────────────────────────────────────── + + +@infospace_commands.command() +@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") +@click.option( + "--concern", "concerns", multiple=True, + type=click.Choice(["redundancy", "coverage", "coherence", "consistency", "granularity"]), + help="Run specific concern(s). Omit to run all five.", +) +@click.option("--json", "as_json", is_flag=True, help="Output results as JSON.") +def check(config_path: Optional[str], concerns: tuple, as_json: bool): + """Run collection-level quality checks (C1–C5).""" + cfg, cfg_path = _load_config_or_exit(config_path) + root = cfg_path.parent + + entities_dir = root / cfg.entities_dir + if not entities_dir.is_dir(): + click.echo("Error: No entities directory found.", err=True) + raise SystemExit(1) + + entity_list = parse_entity_directory(entities_dir) + if not entity_list: + click.echo("No entities to check.") + return + + from markitect.infospace.checks import run_all_checks + + checks_list = list(concerns) if concerns else None + + report = run_all_checks( + entities=entity_list, + checks=checks_list, + ) + + if as_json: + import json + click.echo(json.dumps(report.to_dict(), indent=2)) + else: + click.echo(f"Collection checks — {len(entity_list)} entities\n") + d = report.to_dict() + for concern_name, concern_data in d.items(): + label = concern_data.get("concern", concern_name.upper()) + click.echo(f" {label} — {concern_name}") + for k, v in concern_data.items(): + if k == "concern": + continue + click.echo(f" {k}: {v}") + click.echo() + + # Show summary metrics + m = report.metrics() + if m and not as_json: + click.echo("Metrics summary:") + for k, v in sorted(m.items()): + click.echo(f" {k}: {v:.4f}") diff --git a/tests/unit/infospace/test_checks.py b/tests/unit/infospace/test_checks.py new file mode 100644 index 00000000..b43e8d66 --- /dev/null +++ b/tests/unit/infospace/test_checks.py @@ -0,0 +1,413 @@ +""" +Tests for collection-level quality checks (S2.4). + +Covers all five concerns: Redundancy (C1), Coverage (C2), Coherence (C3), +Consistency (C4), Granularity (C5), and the orchestrator. +""" + +from __future__ import annotations + +import math + +import pytest + +from markitect.infospace.models import EntityMeta +from markitect.prompts.dependencies.models import DependencyGraph + + +# ── helpers ────────────────────────────────────────────────────────── + + +def _entity(slug: str, domain: str = "", definition: str = "", + source_chapter: str = "", word_count: int = 0) -> EntityMeta: + wc = word_count if word_count else (len(definition.split()) if definition else 0) + return EntityMeta( + slug=slug, + title=slug.replace("-", " ").title(), + h1_raw=slug.replace("-", " ").title(), + definition=definition, + domain=domain, + source_chapter=source_chapter, + definition_word_count=wc, + total_word_count=wc, + ) + + +def _sample_entities() -> list[EntityMeta]: + return [ + _entity("alpha", domain="economics", definition="the first concept in our model", source_chapter="ch01"), + _entity("beta", domain="economics", definition="the second concept about markets", source_chapter="ch01"), + _entity("gamma", domain="sociology", definition="a social structure framework", source_chapter="ch02"), + _entity("delta", domain="sociology", definition="a social dynamic pattern", source_chapter="ch02"), + _entity("epsilon", domain="philosophy", definition="an epistemic principle", source_chapter="ch03"), + ] + + +def _linear_graph() -> DependencyGraph: + """A -> B -> C -> D.""" + g = DependencyGraph() + g.add_edge("A", "B") + g.add_edge("B", "C") + g.add_edge("C", "D") + return g + + +def _cyclic_graph() -> DependencyGraph: + """A -> B -> C -> A (one cycle).""" + g = DependencyGraph() + g.add_edge("A", "B") + g.add_edge("B", "C") + g.add_edge("C", "A") + return g + + +def _can_import_graph_analysis(): + try: + from markitect.analysis.graph import connected_components # noqa: F401 + return True + except ImportError: + return False + + +# ── C1: Redundancy ────────────────────────────────────────────────── + + +class TestRedundancy: + def test_empty_entities(self): + from markitect.infospace.checks.redundancy import check_redundancy + report = check_redundancy([]) + assert report.entity_count == 0 + assert report.redundancy_ratio == 0.0 + assert report.similar_pairs == [] + + def test_single_entity(self): + from markitect.infospace.checks.redundancy import check_redundancy + report = check_redundancy([_entity("a", definition="hello world")]) + assert report.entity_count == 1 + assert report.redundancy_ratio == 0.0 + + def test_no_overlap_word_fallback(self): + from markitect.infospace.checks.redundancy import check_redundancy + entities = [ + _entity("a", definition="apple banana cherry"), + _entity("b", definition="delta epsilon zeta"), + ] + report = check_redundancy(entities, threshold=0.5) + assert report.similar_pairs == [] + assert report.redundancy_ratio == 0.0 + + def test_high_overlap_word_fallback(self): + from markitect.infospace.checks.redundancy import check_redundancy + entities = [ + _entity("a", definition="the quick brown fox"), + _entity("b", definition="the quick brown dog"), + ] + report = check_redundancy(entities, threshold=0.5) + assert len(report.similar_pairs) == 1 + assert report.similar_pairs[0]["method"] == "word_overlap" + assert report.similar_pairs[0]["entity_a"] == "a" + assert report.similar_pairs[0]["entity_b"] == "b" + assert report.redundancy_ratio == 1.0 # both entities involved + + def test_embedding_based(self): + from markitect.infospace.checks.redundancy import check_redundancy + entities = [ + _entity("a", definition="x"), + _entity("b", definition="y"), + _entity("c", definition="z"), + ] + # a and b are very similar; c is different + embeddings = { + "a": [1.0, 0.0, 0.0], + "b": [0.99, 0.1, 0.0], + "c": [0.0, 0.0, 1.0], + } + report = check_redundancy(entities, embeddings=embeddings, threshold=0.9) + assert len(report.similar_pairs) >= 1 + assert report.similar_pairs[0]["method"] == "embedding" + assert report.redundancy_ratio > 0.0 + + def test_to_dict(self): + from markitect.infospace.checks.redundancy import RedundancyReport + r = RedundancyReport(similar_pairs=[], redundancy_ratio=0.25, entity_count=10) + d = r.to_dict() + assert d["concern"] == "C1" + assert d["redundancy_ratio"] == 0.25 + assert d["entity_count"] == 10 + + +# ── C2: Coverage ──────────────────────────────────────────────────── + + +class TestCoverage: + def test_empty_entities(self): + from markitect.infospace.checks.coverage import check_coverage + report = check_coverage([]) + assert report.entity_count == 0 + assert report.coverage_ratio == 0.0 + + def test_full_coverage(self): + """All domain×chapter cells are populated.""" + from markitect.infospace.checks.coverage import check_coverage + entities = [ + _entity("a", domain="d1", source_chapter="ch1"), + _entity("b", domain="d2", source_chapter="ch1"), + _entity("c", domain="d1", source_chapter="ch2"), + _entity("d", domain="d2", source_chapter="ch2"), + ] + report = check_coverage(entities) + assert report.coverage_ratio == 1.0 + assert report.empty_cells == [] + + def test_partial_coverage(self): + """One cell is missing → coverage < 1.0.""" + from markitect.infospace.checks.coverage import check_coverage + entities = [ + _entity("a", domain="d1", source_chapter="ch1"), + _entity("b", domain="d2", source_chapter="ch1"), + _entity("c", domain="d1", source_chapter="ch2"), + # Missing: d2×ch2 + ] + report = check_coverage(entities) + assert report.coverage_ratio < 1.0 + assert len(report.empty_cells) == 1 + assert report.empty_cells[0]["dimension_a"] == "domain:d2" + assert report.empty_cells[0]["dimension_b"] == "chapter:ch2" + + def test_domain_counts(self): + from markitect.infospace.checks.coverage import check_coverage + entities = _sample_entities() + report = check_coverage(entities) + assert report.domain_counts["economics"] == 2 + assert report.domain_counts["sociology"] == 2 + assert report.domain_counts["philosophy"] == 1 + + def test_to_dict(self): + from markitect.infospace.checks.coverage import CoverageReport + r = CoverageReport(coverage_ratio=0.75, entity_count=8) + d = r.to_dict() + assert d["concern"] == "C2" + assert d["coverage_ratio"] == 0.75 + + def test_extra_attributes(self): + from markitect.infospace.checks.coverage import check_coverage + entities = [ + _entity("a", domain="d1", source_chapter="ch1"), + ] + extra = {"a": {"vsm:production"}} + report = check_coverage(entities, extra_attributes=extra) + assert report.entity_count == 1 + + +# ── C3: Coherence ─────────────────────────────────────────────────── + + +class TestCoherence: + def test_no_graph(self): + from markitect.infospace.checks.coherence import check_coherence + report = check_coherence(graph=None, entity_count=5) + assert report.connected_components == 0 + assert report.entity_count == 5 + + def test_empty_graph(self): + from markitect.infospace.checks.coherence import check_coherence + g = DependencyGraph() + report = check_coherence(graph=g, entity_count=0) + assert report.connected_components == 0 + + def test_to_dict(self): + from markitect.infospace.checks.coherence import CoherenceReport + r = CoherenceReport(connected_components=2, modularity=0.3456, entity_count=10) + d = r.to_dict() + assert d["concern"] == "C3" + assert d["modularity"] == 0.3456 + assert d["connected_components"] == 2 + + @pytest.mark.skipif( + not _can_import_graph_analysis(), + reason="networkx not available", + ) + def test_with_graph(self): + from markitect.infospace.checks.coherence import check_coherence + g = _linear_graph() + report = check_coherence(graph=g, entity_count=4) + assert report.connected_components >= 1 + assert report.entity_count == 4 + + +# ── C4: Consistency ───────────────────────────────────────────────── + + +class TestConsistency: + def test_no_graph(self): + from markitect.infospace.checks.consistency import check_consistency + entities = _sample_entities() + report = check_consistency(entities) + assert report.cycle_count == 0 + assert report.entity_count == 5 + + def test_acyclic_graph(self): + from markitect.infospace.checks.consistency import check_consistency + entities = _sample_entities() + g = _linear_graph() + report = check_consistency(entities, graph=g) + assert report.cycle_count == 0 + + def test_cyclic_graph(self): + from markitect.infospace.checks.consistency import check_consistency + entities = _sample_entities() + g = _cyclic_graph() + report = check_consistency(entities, graph=g) + assert report.cycle_count >= 1 + assert len(report.cycles) >= 1 + + def test_to_dict(self): + from markitect.infospace.checks.consistency import ConsistencyReport + r = ConsistencyReport(cycles=[["A", "B", "A"]], cycle_count=1, entity_count=5) + d = r.to_dict() + assert d["concern"] == "C4" + assert d["cycle_count"] == 1 + + +# ── C5: Granularity ───────────────────────────────────────────────── + + +class TestGranularity: + def test_empty_entities(self): + from markitect.infospace.checks.granularity import check_granularity + report = check_granularity([]) + assert report.entity_count == 0 + assert report.domain_entropy == 0.0 + + def test_single_domain(self): + from markitect.infospace.checks.granularity import check_granularity + entities = [ + _entity("a", domain="d1", word_count=10), + _entity("b", domain="d1", word_count=20), + ] + report = check_granularity(entities) + assert report.domain_entropy == 0.0 # single domain = zero entropy + assert report.entity_count == 2 + assert report.word_count_stats["mean"] == 15.0 + + def test_balanced_domains(self): + from markitect.infospace.checks.granularity import check_granularity + entities = [ + _entity("a", domain="d1", word_count=10), + _entity("b", domain="d2", word_count=10), + ] + report = check_granularity(entities) + assert report.domain_entropy == pytest.approx(1.0) # log2(2) = 1.0 + assert report.domain_distribution == {"d1": 1, "d2": 1} + + def test_word_count_stats(self): + from markitect.infospace.checks.granularity import check_granularity + entities = [ + _entity("a", domain="d1", word_count=10), + _entity("b", domain="d1", word_count=30), + ] + report = check_granularity(entities) + assert report.word_count_stats["mean"] == 20.0 + assert report.word_count_stats["min"] == 10.0 + assert report.word_count_stats["max"] == 30.0 + assert report.word_count_stats["std"] == 10.0 + + def test_to_dict(self): + from markitect.infospace.checks.granularity import GranularityReport + r = GranularityReport(domain_entropy=1.5, entity_count=4) + d = r.to_dict() + assert d["concern"] == "C5" + assert d["domain_entropy"] == 1.5 + + def test_unspecified_domain(self): + from markitect.infospace.checks.granularity import check_granularity + entities = [_entity("a", domain="", word_count=10)] + report = check_granularity(entities) + assert "(unspecified)" in report.domain_distribution + + +# ── Orchestrator ──────────────────────────────────────────────────── + + +class TestOrchestrator: + def test_run_all_default(self): + from markitect.infospace.checks.orchestrator import run_all_checks + entities = _sample_entities() + report = run_all_checks(entities) + assert report.redundancy is not None + assert report.coverage is not None + assert report.coherence is not None + assert report.consistency is not None + assert report.granularity is not None + + def test_run_selected_checks(self): + from markitect.infospace.checks.orchestrator import run_all_checks + entities = _sample_entities() + report = run_all_checks(entities, checks=["redundancy", "granularity"]) + assert report.redundancy is not None + assert report.granularity is not None + assert report.coverage is None + assert report.coherence is None + assert report.consistency is None + + def test_to_dict(self): + from markitect.infospace.checks.orchestrator import run_all_checks + entities = _sample_entities() + report = run_all_checks(entities, checks=["granularity"]) + d = report.to_dict() + assert "granularity" in d + assert "redundancy" not in d + + def test_metrics(self): + from markitect.infospace.checks.orchestrator import run_all_checks + entities = _sample_entities() + report = run_all_checks(entities, checks=["redundancy", "granularity"]) + m = report.metrics() + assert "redundancy_ratio" in m + assert "granularity_entropy" in m + assert isinstance(m["redundancy_ratio"], float) + assert isinstance(m["granularity_entropy"], float) + + def test_metrics_empty_report(self): + from markitect.infospace.checks.orchestrator import CheckReport + report = CheckReport() + assert report.metrics() == {} + + def test_run_all_with_graph(self): + from markitect.infospace.checks.orchestrator import run_all_checks + entities = _sample_entities() + g = _linear_graph() + report = run_all_checks(entities, graph=g, checks=["consistency"]) + assert report.consistency is not None + assert report.consistency.cycle_count == 0 + + def test_run_all_with_cyclic_graph(self): + from markitect.infospace.checks.orchestrator import run_all_checks + entities = _sample_entities() + g = _cyclic_graph() + report = run_all_checks(entities, graph=g, checks=["consistency"]) + assert report.consistency.cycle_count >= 1 + + +# ── Shannon entropy helper ────────────────────────────────────────── + + +class TestShannonEntropy: + def test_uniform_distribution(self): + from markitect.infospace.checks.granularity import _shannon_entropy + counts = {"a": 1, "b": 1, "c": 1, "d": 1} + assert _shannon_entropy(counts) == pytest.approx(2.0) # log2(4) + + def test_single_element(self): + from markitect.infospace.checks.granularity import _shannon_entropy + assert _shannon_entropy({"a": 10}) == 0.0 + + def test_empty(self): + from markitect.infospace.checks.granularity import _shannon_entropy + assert _shannon_entropy({}) == 0.0 + + def test_skewed(self): + from markitect.infospace.checks.granularity import _shannon_entropy + counts = {"a": 99, "b": 1} + entropy = _shannon_entropy(counts) + assert 0.0 < entropy < 1.0