From b20fe4db68670009fc463f9efb3489a717439a13 Mon Sep 17 00:00:00 2001 From: tegwick Date: Thu, 19 Feb 2026 01:44:14 +0100 Subject: [PATCH] feat(infospace): add infospace configuration model and state (S2.1) InfospaceConfig (topic, disciplines, schemas, competency questions, viability thresholds, pipeline) with YAML load/save and directory discovery. InfospaceState aggregates entities, evaluations, and viability checks for status reporting. Co-Authored-By: Claude Opus 4.6 --- markitect/infospace/__init__.py | 32 +++ markitect/infospace/config.py | 309 +++++++++++++++++++++ markitect/infospace/state.py | 141 ++++++++++ tests/unit/infospace/test_config.py | 400 ++++++++++++++++++++++++++++ 4 files changed, 882 insertions(+) create mode 100644 markitect/infospace/config.py create mode 100644 markitect/infospace/state.py create mode 100644 tests/unit/infospace/test_config.py diff --git a/markitect/infospace/__init__.py b/markitect/infospace/__init__.py index addcfcd5..c2e461af 100644 --- a/markitect/infospace/__init__.py +++ b/markitect/infospace/__init__.py @@ -39,6 +39,23 @@ from .evaluation_io import ( write_entity_evaluation, write_snapshot, ) +from .config import ( + DisciplineBinding, + InfospaceConfig, + PipelineConfig, + PipelineStage, + SchemaRegistry, + TopicConfig, + ViabilityThreshold, + find_infospace_config, + load_infospace_config, + save_infospace_config, +) +from .state import ( + InfospaceState, + ViabilityResult, + build_state, +) __all__ = [ "EntityMeta", @@ -72,4 +89,19 @@ __all__ = [ "read_snapshot", "write_entity_evaluation", "write_snapshot", + # Config + "DisciplineBinding", + "InfospaceConfig", + "PipelineConfig", + "PipelineStage", + "SchemaRegistry", + "TopicConfig", + "ViabilityThreshold", + "find_infospace_config", + "load_infospace_config", + "save_infospace_config", + # State + "InfospaceState", + "ViabilityResult", + "build_state", ] diff --git a/markitect/infospace/config.py b/markitect/infospace/config.py new file mode 100644 index 00000000..ba184ca4 --- /dev/null +++ b/markitect/infospace/config.py @@ -0,0 +1,309 @@ +""" +Infospace configuration model and YAML loader. + +An infospace is declared via an ``infospace.yaml`` file that specifies +its topic, disciplines, schemas, competency questions, and viability +thresholds. This module provides the data models and I/O for that +configuration. + +Example ``infospace.yaml``:: + + topic: + name: "The Wealth of Nations" + domain: "Classical Economics" + sources: artifacts/sources/ + + disciplines: + - name: "Viable System Model" + path: artifacts/vsm-reference/ + + schemas: + entity: schemas/economic-entity-schema-v1.0.md + + competency_questions: schemas/competency-questions.md + + viability: + coverage_ratio: { min: 0.60 } + per_entity_mean: { min: 3.5 } +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +import yaml + + +@dataclass +class TopicConfig: + """The subject matter an infospace explains. + + Attributes: + name: Human-readable topic name. + domain: Broader knowledge domain. + sources: Path (relative to infospace root) to source material. + """ + + name: str + domain: str = "" + sources: str = "" + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = {"name": self.name} + if self.domain: + d["domain"] = self.domain + if self.sources: + d["sources"] = self.sources + return d + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> TopicConfig: + return cls( + name=data["name"], + domain=data.get("domain", ""), + sources=data.get("sources", ""), + ) + + +@dataclass +class DisciplineBinding: + """An external infospace applied as an analytical lens. + + Attributes: + name: Human-readable discipline name. + path: Path to the discipline infospace (relative to root). + """ + + name: str + path: str = "" + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = {"name": self.name} + if self.path: + d["path"] = self.path + return d + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> DisciplineBinding: + return cls(name=data["name"], path=data.get("path", "")) + + +@dataclass +class SchemaRegistry: + """Schema paths governing entity and document structure. + + All paths are relative to the infospace root directory. + """ + + entity: str = "" + mapping: str = "" + analysis: str = "" + extra: Dict[str, str] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = {} + if self.entity: + d["entity"] = self.entity + if self.mapping: + d["mapping"] = self.mapping + if self.analysis: + d["analysis"] = self.analysis + d.update(self.extra) + return d + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> SchemaRegistry: + known = {"entity", "mapping", "analysis"} + extra = {k: v for k, v in data.items() if k not in known} + return cls( + entity=data.get("entity", ""), + mapping=data.get("mapping", ""), + analysis=data.get("analysis", ""), + extra=extra, + ) + + +@dataclass +class ViabilityThreshold: + """Threshold for a single viability metric. + + At least one of *min* or *max* should be set. + """ + + metric: str + min: Optional[float] = None + max: Optional[float] = None + + def check(self, value: float) -> bool: + """Return ``True`` if *value* is within the threshold.""" + if self.min is not None and value < self.min: + return False + if self.max is not None and value > self.max: + return False + return True + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = {} + if self.min is not None: + d["min"] = self.min + if self.max is not None: + d["max"] = self.max + return d + + +@dataclass +class PipelineStage: + """A single stage in the processing pipeline.""" + + template: str + spaces: List[str] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = {"template": self.template} + if self.spaces: + d["spaces"] = self.spaces + return d + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> PipelineStage: + return cls( + template=data["template"], + spaces=data.get("spaces", []), + ) + + +@dataclass +class PipelineConfig: + """Processing pipeline configuration.""" + + stages: List[PipelineStage] = field(default_factory=list) + post_batch: List[PipelineStage] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = {} + if self.stages: + d["stages"] = [s.to_dict() for s in self.stages] + if self.post_batch: + d["post_batch"] = [s.to_dict() for s in self.post_batch] + return d + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> PipelineConfig: + return cls( + stages=[PipelineStage.from_dict(s) for s in data.get("stages", [])], + post_batch=[PipelineStage.from_dict(s) for s in data.get("post_batch", [])], + ) + + +@dataclass +class InfospaceConfig: + """Complete infospace configuration, loaded from ``infospace.yaml``. + + This is the declarative description of an infospace: what it + explains, through which lenses, governed by which schemas, and + what quality thresholds it must meet. + """ + + topic: TopicConfig + disciplines: List[DisciplineBinding] = field(default_factory=list) + schemas: SchemaRegistry = field(default_factory=SchemaRegistry) + competency_questions: str = "" + viability: Dict[str, ViabilityThreshold] = field(default_factory=dict) + pipeline: Optional[PipelineConfig] = None + entities_dir: str = "output/entities" + evaluations_dir: str = "output/evaluations" + metrics_dir: str = "output/metrics" + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = {"topic": self.topic.to_dict()} + if self.disciplines: + d["disciplines"] = [db.to_dict() for db in self.disciplines] + schemas_dict = self.schemas.to_dict() + if schemas_dict: + d["schemas"] = schemas_dict + if self.competency_questions: + d["competency_questions"] = self.competency_questions + if self.viability: + d["viability"] = { + name: t.to_dict() for name, t in self.viability.items() + } + if self.pipeline: + d["pipeline"] = self.pipeline.to_dict() + if self.entities_dir != "output/entities": + d["entities_dir"] = self.entities_dir + if self.evaluations_dir != "output/evaluations": + d["evaluations_dir"] = self.evaluations_dir + if self.metrics_dir != "output/metrics": + d["metrics_dir"] = self.metrics_dir + return d + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> InfospaceConfig: + viability_raw = data.get("viability", {}) + viability = { + name: ViabilityThreshold(metric=name, **bounds) + for name, bounds in viability_raw.items() + } + pipeline_raw = data.get("pipeline") + pipeline = PipelineConfig.from_dict(pipeline_raw) if pipeline_raw else None + + return cls( + topic=TopicConfig.from_dict(data["topic"]), + disciplines=[ + DisciplineBinding.from_dict(d) + for d in data.get("disciplines", []) + ], + schemas=SchemaRegistry.from_dict(data.get("schemas", {})), + competency_questions=data.get("competency_questions", ""), + viability=viability, + pipeline=pipeline, + entities_dir=data.get("entities_dir", "output/entities"), + evaluations_dir=data.get("evaluations_dir", "output/evaluations"), + metrics_dir=data.get("metrics_dir", "output/metrics"), + ) + + +def load_infospace_config(path: Path) -> InfospaceConfig: + """Load an :class:`InfospaceConfig` from a YAML file. + + Args: + path: Path to ``infospace.yaml``. + + Raises: + FileNotFoundError: If *path* does not exist. + ValueError: If required fields are missing. + """ + data = yaml.safe_load(path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + raise ValueError(f"Expected a YAML mapping in {path}") + if "topic" not in data: + raise ValueError(f"Missing required 'topic' key in {path}") + return InfospaceConfig.from_dict(data) + + +def save_infospace_config(config: InfospaceConfig, path: Path) -> None: + """Write an :class:`InfospaceConfig` to a YAML file.""" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + yaml.safe_dump( + config.to_dict(), + default_flow_style=False, + sort_keys=False, + ), + encoding="utf-8", + ) + + +def find_infospace_config(start: Optional[Path] = None) -> Optional[Path]: + """Walk up from *start* looking for ``infospace.yaml``. + + Returns the path to the config file, or ``None``. + """ + current = (start or Path.cwd()).resolve() + for directory in [current, *current.parents]: + candidate = directory / "infospace.yaml" + if candidate.is_file(): + return candidate + return None diff --git a/markitect/infospace/state.py b/markitect/infospace/state.py new file mode 100644 index 00000000..17f41c68 --- /dev/null +++ b/markitect/infospace/state.py @@ -0,0 +1,141 @@ +""" +Infospace runtime state. + +Computed from the current entities, evaluations, and metrics on disk. +Provides the data behind ``markitect infospace status`` and +``markitect infospace viability``. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +from markitect.infospace.config import InfospaceConfig, ViabilityThreshold +from markitect.infospace.models import EntityMeta +from markitect.infospace.evaluation import EvaluationSnapshot + + +@dataclass +class ViabilityResult: + """Result of checking a single viability threshold.""" + + metric: str + value: float + threshold: ViabilityThreshold + passed: bool + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = { + "metric": self.metric, + "value": self.value, + "passed": self.passed, + } + if self.threshold.min is not None: + d["min"] = self.threshold.min + if self.threshold.max is not None: + d["max"] = self.threshold.max + return d + + +@dataclass +class InfospaceState: + """Current runtime state of an infospace. + + Aggregates entity metadata, evaluation results, and viability + checks into a single queryable object. + """ + + config: InfospaceConfig + entities: List[EntityMeta] = field(default_factory=list) + latest_snapshot: Optional[EvaluationSnapshot] = None + viability_results: List[ViabilityResult] = field(default_factory=list) + computed_at: datetime = field(default_factory=datetime.utcnow) + + @property + def entity_count(self) -> int: + return len(self.entities) + + @property + def topic_name(self) -> str: + return self.config.topic.name + + @property + def is_viable(self) -> bool: + """``True`` if all viability thresholds are met.""" + if not self.viability_results: + return False + return all(r.passed for r in self.viability_results) + + @property + def viability_pass_count(self) -> int: + return sum(1 for r in self.viability_results if r.passed) + + @property + def viability_total_count(self) -> int: + return len(self.viability_results) + + @property + def domains(self) -> List[str]: + """Distinct domain values across all entities.""" + return sorted({e.domain for e in self.entities if e.domain}) + + @property + def has_evaluations(self) -> bool: + return self.latest_snapshot is not None + + def check_viability(self, metrics: Dict[str, float]) -> List[ViabilityResult]: + """Check *metrics* against the configured viability thresholds. + + Updates :attr:`viability_results` and returns the results. + """ + results: List[ViabilityResult] = [] + for name, threshold in self.config.viability.items(): + value = metrics.get(name, 0.0) + results.append(ViabilityResult( + metric=name, + value=value, + threshold=threshold, + passed=threshold.check(value), + )) + self.viability_results = results + return results + + def summary(self) -> Dict[str, Any]: + """Return a summary dict suitable for display or serialisation.""" + d: Dict[str, Any] = { + "topic": self.topic_name, + "entity_count": self.entity_count, + "domains": self.domains, + "has_evaluations": self.has_evaluations, + } + if self.viability_results: + d["viable"] = self.is_viable + d["viability_pass"] = self.viability_pass_count + d["viability_total"] = self.viability_total_count + if self.latest_snapshot: + d["last_evaluated"] = self.latest_snapshot.created_at.isoformat() + return d + + +def build_state( + config: InfospaceConfig, + entities: Optional[List[EntityMeta]] = None, + snapshot: Optional[EvaluationSnapshot] = None, + metrics: Optional[Dict[str, float]] = None, +) -> InfospaceState: + """Build an :class:`InfospaceState` from available data. + + This is a convenience function that assembles the state object + and optionally runs viability checks if *metrics* are provided. + """ + state = InfospaceState( + config=config, + entities=entities or [], + latest_snapshot=snapshot, + ) + if metrics is not None: + state.check_viability(metrics) + return state diff --git a/tests/unit/infospace/test_config.py b/tests/unit/infospace/test_config.py new file mode 100644 index 00000000..d369f75c --- /dev/null +++ b/tests/unit/infospace/test_config.py @@ -0,0 +1,400 @@ +"""Tests for markitect.infospace.config and state.""" + +from datetime import datetime +from pathlib import Path + +import pytest + +from markitect.infospace.config import ( + DisciplineBinding, + InfospaceConfig, + PipelineConfig, + PipelineStage, + SchemaRegistry, + TopicConfig, + ViabilityThreshold, + find_infospace_config, + load_infospace_config, + save_infospace_config, +) +from markitect.infospace.state import ( + InfospaceState, + ViabilityResult, + build_state, +) +from markitect.infospace.models import EntityMeta +from markitect.infospace.evaluation import ( + EntityEvaluation, + EvaluationSnapshot, + ScoreEntry, +) + + +# ── Helpers ────────────────────────────────────────────────────────── + + +_SAMPLE_YAML = """\ +topic: + name: "The Wealth of Nations" + domain: "Classical Economics" + sources: artifacts/sources/ + +disciplines: + - name: "Viable System Model" + path: artifacts/vsm-reference/ + +schemas: + entity: schemas/economic-entity-schema-v1.0.md + mapping: schemas/vsm-mapping-schema-v1.0.md + +competency_questions: schemas/competency-questions.md + +viability: + coverage_ratio: + min: 0.60 + per_entity_mean: + min: 3.5 + redundancy_ratio: + max: 0.05 + +pipeline: + stages: + - template: extract-entities + spaces: [sources, guidelines] + - template: map-to-vsm + spaces: [entities, vsm-reference] + post_batch: + - template: assess-metrics +""" + + +def _sample_config() -> InfospaceConfig: + return InfospaceConfig( + topic=TopicConfig(name="Test Topic", domain="Testing"), + disciplines=[DisciplineBinding(name="VSM", path="vsm/")], + schemas=SchemaRegistry(entity="schemas/entity.md"), + competency_questions="schemas/cq.md", + viability={ + "coverage_ratio": ViabilityThreshold("coverage_ratio", min=0.6), + "redundancy_ratio": ViabilityThreshold("redundancy_ratio", max=0.05), + }, + ) + + +def _sample_entities(n=5) -> list: + return [ + EntityMeta( + slug=f"entity-{i}", + title=f"Entity {i}", + h1_raw=f"Entity {i}", + domain="Production" if i % 2 == 0 else "Distribution", + ) + for i in range(n) + ] + + +# ── TopicConfig ────────────────────────────────────────────────────── + + +class TestTopicConfig: + def test_round_trip(self): + tc = TopicConfig("WoN", "Economics", "sources/") + d = tc.to_dict() + restored = TopicConfig.from_dict(d) + assert restored.name == "WoN" + assert restored.domain == "Economics" + assert restored.sources == "sources/" + + def test_minimal(self): + tc = TopicConfig.from_dict({"name": "Minimal"}) + assert tc.domain == "" + assert tc.sources == "" + + def test_to_dict_omits_empty(self): + tc = TopicConfig("X") + d = tc.to_dict() + assert "domain" not in d + assert "sources" not in d + + +# ── DisciplineBinding ──────────────────────────────────────────────── + + +class TestDisciplineBinding: + def test_round_trip(self): + db = DisciplineBinding("VSM", "path/to/vsm") + d = db.to_dict() + restored = DisciplineBinding.from_dict(d) + assert restored.name == "VSM" + assert restored.path == "path/to/vsm" + + +# ── SchemaRegistry ─────────────────────────────────────────────────── + + +class TestSchemaRegistry: + def test_round_trip(self): + sr = SchemaRegistry(entity="e.md", mapping="m.md", analysis="a.md") + d = sr.to_dict() + restored = SchemaRegistry.from_dict(d) + assert restored.entity == "e.md" + assert restored.mapping == "m.md" + + def test_extra_schemas(self): + sr = SchemaRegistry.from_dict({"entity": "e.md", "custom": "c.md"}) + assert sr.entity == "e.md" + assert sr.extra == {"custom": "c.md"} + + +# ── ViabilityThreshold ────────────────────────────────────────────── + + +class TestViabilityThreshold: + def test_min_check(self): + t = ViabilityThreshold("x", min=0.5) + assert t.check(0.6) is True + assert t.check(0.5) is True + assert t.check(0.4) is False + + def test_max_check(self): + t = ViabilityThreshold("x", max=0.1) + assert t.check(0.05) is True + assert t.check(0.1) is True + assert t.check(0.2) is False + + def test_min_and_max(self): + t = ViabilityThreshold("x", min=0.3, max=0.7) + assert t.check(0.5) is True + assert t.check(0.2) is False + assert t.check(0.8) is False + + def test_no_bounds_always_passes(self): + t = ViabilityThreshold("x") + assert t.check(999.0) is True + + +# ── PipelineConfig ────────────────────────────────────────────────── + + +class TestPipelineConfig: + def test_round_trip(self): + pc = PipelineConfig( + stages=[PipelineStage("extract", ["s1", "s2"])], + post_batch=[PipelineStage("assess")], + ) + d = pc.to_dict() + restored = PipelineConfig.from_dict(d) + assert len(restored.stages) == 1 + assert restored.stages[0].template == "extract" + assert restored.stages[0].spaces == ["s1", "s2"] + assert len(restored.post_batch) == 1 + + +# ── InfospaceConfig ───────────────────────────────────────────────── + + +class TestInfospaceConfig: + def test_to_dict_from_dict_round_trip(self): + cfg = _sample_config() + d = cfg.to_dict() + restored = InfospaceConfig.from_dict(d) + assert restored.topic.name == "Test Topic" + assert len(restored.disciplines) == 1 + assert restored.schemas.entity == "schemas/entity.md" + assert restored.competency_questions == "schemas/cq.md" + assert len(restored.viability) == 2 + + def test_viability_thresholds_preserved(self): + cfg = _sample_config() + d = cfg.to_dict() + restored = InfospaceConfig.from_dict(d) + assert restored.viability["coverage_ratio"].min == 0.6 + assert restored.viability["redundancy_ratio"].max == 0.05 + + def test_default_dirs(self): + cfg = InfospaceConfig(topic=TopicConfig("X")) + assert cfg.entities_dir == "output/entities" + assert cfg.evaluations_dir == "output/evaluations" + assert cfg.metrics_dir == "output/metrics" + + def test_custom_dirs(self): + cfg = InfospaceConfig.from_dict({ + "topic": {"name": "X"}, + "entities_dir": "custom/entities", + }) + assert cfg.entities_dir == "custom/entities" + + +# ── YAML I/O ──────────────────────────────────────────────────────── + + +class TestYAMLIO: + def test_save_load_round_trip(self, tmp_path): + cfg = _sample_config() + p = tmp_path / "infospace.yaml" + save_infospace_config(cfg, p) + loaded = load_infospace_config(p) + assert loaded.topic.name == cfg.topic.name + assert len(loaded.viability) == len(cfg.viability) + + def test_load_full_example(self, tmp_path): + p = tmp_path / "infospace.yaml" + p.write_text(_SAMPLE_YAML, encoding="utf-8") + cfg = load_infospace_config(p) + assert cfg.topic.name == "The Wealth of Nations" + assert cfg.topic.domain == "Classical Economics" + assert len(cfg.disciplines) == 1 + assert cfg.disciplines[0].name == "Viable System Model" + assert cfg.schemas.entity == "schemas/economic-entity-schema-v1.0.md" + assert cfg.competency_questions == "schemas/competency-questions.md" + assert len(cfg.viability) == 3 + assert cfg.viability["coverage_ratio"].min == 0.60 + assert cfg.viability["redundancy_ratio"].max == 0.05 + assert cfg.pipeline is not None + assert len(cfg.pipeline.stages) == 2 + assert len(cfg.pipeline.post_batch) == 1 + + def test_load_missing_file(self, tmp_path): + with pytest.raises(FileNotFoundError): + load_infospace_config(tmp_path / "nonexistent.yaml") + + def test_load_missing_topic(self, tmp_path): + p = tmp_path / "bad.yaml" + p.write_text("schemas:\n entity: x.md\n") + with pytest.raises(ValueError, match="topic"): + load_infospace_config(p) + + def test_save_creates_parent_dirs(self, tmp_path): + cfg = InfospaceConfig(topic=TopicConfig("X")) + p = tmp_path / "deep" / "nested" / "infospace.yaml" + save_infospace_config(cfg, p) + assert p.exists() + + +class TestFindConfig: + def test_finds_config_in_current_dir(self, tmp_path): + (tmp_path / "infospace.yaml").write_text("topic:\n name: X\n") + found = find_infospace_config(tmp_path) + assert found is not None + assert found.name == "infospace.yaml" + + def test_finds_config_in_parent(self, tmp_path): + (tmp_path / "infospace.yaml").write_text("topic:\n name: X\n") + child = tmp_path / "sub" / "dir" + child.mkdir(parents=True) + found = find_infospace_config(child) + assert found is not None + + def test_returns_none_if_not_found(self, tmp_path): + assert find_infospace_config(tmp_path) is None + + +# ── InfospaceState ────────────────────────────────────────────────── + + +class TestInfospaceState: + def test_entity_count(self): + cfg = _sample_config() + state = InfospaceState(config=cfg, entities=_sample_entities(5)) + assert state.entity_count == 5 + + def test_topic_name(self): + cfg = _sample_config() + state = InfospaceState(config=cfg) + assert state.topic_name == "Test Topic" + + def test_domains(self): + cfg = _sample_config() + state = InfospaceState(config=cfg, entities=_sample_entities(4)) + assert "Production" in state.domains + assert "Distribution" in state.domains + + def test_has_evaluations(self): + cfg = _sample_config() + state = InfospaceState(config=cfg) + assert state.has_evaluations is False + + snap = EvaluationSnapshot( + snapshot_id="s1", + created_at=datetime(2026, 1, 1), + schema_name="Test", + entity_count=0, + ) + state.latest_snapshot = snap + assert state.has_evaluations is True + + +class TestViabilityCheck: + def test_all_pass(self): + cfg = _sample_config() + state = InfospaceState(config=cfg) + metrics = {"coverage_ratio": 0.8, "redundancy_ratio": 0.02} + results = state.check_viability(metrics) + assert all(r.passed for r in results) + assert state.is_viable is True + + def test_one_fails(self): + cfg = _sample_config() + state = InfospaceState(config=cfg) + metrics = {"coverage_ratio": 0.4, "redundancy_ratio": 0.02} + results = state.check_viability(metrics) + assert not all(r.passed for r in results) + assert state.is_viable is False + + def test_missing_metric_defaults_to_zero(self): + cfg = _sample_config() + state = InfospaceState(config=cfg) + # coverage_ratio min=0.6, missing → 0.0 → fails + results = state.check_viability({}) + coverage = next(r for r in results if r.metric == "coverage_ratio") + assert coverage.passed is False + assert coverage.value == 0.0 + + def test_viability_counts(self): + cfg = _sample_config() + state = InfospaceState(config=cfg) + metrics = {"coverage_ratio": 0.8, "redundancy_ratio": 0.2} + state.check_viability(metrics) + assert state.viability_pass_count == 1 # coverage passes + assert state.viability_total_count == 2 + + def test_no_thresholds_not_viable(self): + cfg = InfospaceConfig(topic=TopicConfig("X")) + state = InfospaceState(config=cfg) + assert state.is_viable is False + + +class TestBuildState: + def test_builds_with_entities(self): + cfg = _sample_config() + entities = _sample_entities(3) + state = build_state(cfg, entities=entities) + assert state.entity_count == 3 + + def test_builds_with_metrics(self): + cfg = _sample_config() + metrics = {"coverage_ratio": 0.9, "redundancy_ratio": 0.01} + state = build_state(cfg, metrics=metrics) + assert state.is_viable is True + + def test_summary(self): + cfg = _sample_config() + entities = _sample_entities(3) + metrics = {"coverage_ratio": 0.9, "redundancy_ratio": 0.01} + state = build_state(cfg, entities=entities, metrics=metrics) + s = state.summary() + assert s["topic"] == "Test Topic" + assert s["entity_count"] == 3 + assert s["viable"] is True + + +class TestViabilityResult: + def test_to_dict(self): + t = ViabilityThreshold("x", min=0.5) + r = ViabilityResult(metric="x", value=0.7, threshold=t, passed=True) + d = r.to_dict() + assert d["metric"] == "x" + assert d["value"] == 0.7 + assert d["passed"] is True + assert d["min"] == 0.5 + assert "max" not in d