Files
markitect-main/markitect/infospace/config.py
tegwick b20fe4db68 feat(infospace): add infospace configuration model and state (S2.1)
InfospaceConfig (topic, disciplines, schemas, competency questions,
viability thresholds, pipeline) with YAML load/save and directory
discovery. InfospaceState aggregates entities, evaluations, and
viability checks for status reporting.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 01:44:14 +01:00

310 lines
9.2 KiB
Python

"""
Infospace configuration model and YAML loader.
An infospace is declared via an ``infospace.yaml`` file that specifies
its topic, disciplines, schemas, competency questions, and viability
thresholds. This module provides the data models and I/O for that
configuration.
Example ``infospace.yaml``::
topic:
name: "The Wealth of Nations"
domain: "Classical Economics"
sources: artifacts/sources/
disciplines:
- name: "Viable System Model"
path: artifacts/vsm-reference/
schemas:
entity: schemas/economic-entity-schema-v1.0.md
competency_questions: schemas/competency-questions.md
viability:
coverage_ratio: { min: 0.60 }
per_entity_mean: { min: 3.5 }
"""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
@dataclass
class TopicConfig:
"""The subject matter an infospace explains.
Attributes:
name: Human-readable topic name.
domain: Broader knowledge domain.
sources: Path (relative to infospace root) to source material.
"""
name: str
domain: str = ""
sources: str = ""
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {"name": self.name}
if self.domain:
d["domain"] = self.domain
if self.sources:
d["sources"] = self.sources
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> TopicConfig:
return cls(
name=data["name"],
domain=data.get("domain", ""),
sources=data.get("sources", ""),
)
@dataclass
class DisciplineBinding:
"""An external infospace applied as an analytical lens.
Attributes:
name: Human-readable discipline name.
path: Path to the discipline infospace (relative to root).
"""
name: str
path: str = ""
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {"name": self.name}
if self.path:
d["path"] = self.path
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> DisciplineBinding:
return cls(name=data["name"], path=data.get("path", ""))
@dataclass
class SchemaRegistry:
"""Schema paths governing entity and document structure.
All paths are relative to the infospace root directory.
"""
entity: str = ""
mapping: str = ""
analysis: str = ""
extra: Dict[str, str] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {}
if self.entity:
d["entity"] = self.entity
if self.mapping:
d["mapping"] = self.mapping
if self.analysis:
d["analysis"] = self.analysis
d.update(self.extra)
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> SchemaRegistry:
known = {"entity", "mapping", "analysis"}
extra = {k: v for k, v in data.items() if k not in known}
return cls(
entity=data.get("entity", ""),
mapping=data.get("mapping", ""),
analysis=data.get("analysis", ""),
extra=extra,
)
@dataclass
class ViabilityThreshold:
"""Threshold for a single viability metric.
At least one of *min* or *max* should be set.
"""
metric: str
min: Optional[float] = None
max: Optional[float] = None
def check(self, value: float) -> bool:
"""Return ``True`` if *value* is within the threshold."""
if self.min is not None and value < self.min:
return False
if self.max is not None and value > self.max:
return False
return True
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {}
if self.min is not None:
d["min"] = self.min
if self.max is not None:
d["max"] = self.max
return d
@dataclass
class PipelineStage:
"""A single stage in the processing pipeline."""
template: str
spaces: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {"template": self.template}
if self.spaces:
d["spaces"] = self.spaces
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> PipelineStage:
return cls(
template=data["template"],
spaces=data.get("spaces", []),
)
@dataclass
class PipelineConfig:
"""Processing pipeline configuration."""
stages: List[PipelineStage] = field(default_factory=list)
post_batch: List[PipelineStage] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {}
if self.stages:
d["stages"] = [s.to_dict() for s in self.stages]
if self.post_batch:
d["post_batch"] = [s.to_dict() for s in self.post_batch]
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> PipelineConfig:
return cls(
stages=[PipelineStage.from_dict(s) for s in data.get("stages", [])],
post_batch=[PipelineStage.from_dict(s) for s in data.get("post_batch", [])],
)
@dataclass
class InfospaceConfig:
"""Complete infospace configuration, loaded from ``infospace.yaml``.
This is the declarative description of an infospace: what it
explains, through which lenses, governed by which schemas, and
what quality thresholds it must meet.
"""
topic: TopicConfig
disciplines: List[DisciplineBinding] = field(default_factory=list)
schemas: SchemaRegistry = field(default_factory=SchemaRegistry)
competency_questions: str = ""
viability: Dict[str, ViabilityThreshold] = field(default_factory=dict)
pipeline: Optional[PipelineConfig] = None
entities_dir: str = "output/entities"
evaluations_dir: str = "output/evaluations"
metrics_dir: str = "output/metrics"
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {"topic": self.topic.to_dict()}
if self.disciplines:
d["disciplines"] = [db.to_dict() for db in self.disciplines]
schemas_dict = self.schemas.to_dict()
if schemas_dict:
d["schemas"] = schemas_dict
if self.competency_questions:
d["competency_questions"] = self.competency_questions
if self.viability:
d["viability"] = {
name: t.to_dict() for name, t in self.viability.items()
}
if self.pipeline:
d["pipeline"] = self.pipeline.to_dict()
if self.entities_dir != "output/entities":
d["entities_dir"] = self.entities_dir
if self.evaluations_dir != "output/evaluations":
d["evaluations_dir"] = self.evaluations_dir
if self.metrics_dir != "output/metrics":
d["metrics_dir"] = self.metrics_dir
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> InfospaceConfig:
viability_raw = data.get("viability", {})
viability = {
name: ViabilityThreshold(metric=name, **bounds)
for name, bounds in viability_raw.items()
}
pipeline_raw = data.get("pipeline")
pipeline = PipelineConfig.from_dict(pipeline_raw) if pipeline_raw else None
return cls(
topic=TopicConfig.from_dict(data["topic"]),
disciplines=[
DisciplineBinding.from_dict(d)
for d in data.get("disciplines", [])
],
schemas=SchemaRegistry.from_dict(data.get("schemas", {})),
competency_questions=data.get("competency_questions", ""),
viability=viability,
pipeline=pipeline,
entities_dir=data.get("entities_dir", "output/entities"),
evaluations_dir=data.get("evaluations_dir", "output/evaluations"),
metrics_dir=data.get("metrics_dir", "output/metrics"),
)
def load_infospace_config(path: Path) -> InfospaceConfig:
"""Load an :class:`InfospaceConfig` from a YAML file.
Args:
path: Path to ``infospace.yaml``.
Raises:
FileNotFoundError: If *path* does not exist.
ValueError: If required fields are missing.
"""
data = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError(f"Expected a YAML mapping in {path}")
if "topic" not in data:
raise ValueError(f"Missing required 'topic' key in {path}")
return InfospaceConfig.from_dict(data)
def save_infospace_config(config: InfospaceConfig, path: Path) -> None:
"""Write an :class:`InfospaceConfig` to a YAML file."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
yaml.safe_dump(
config.to_dict(),
default_flow_style=False,
sort_keys=False,
),
encoding="utf-8",
)
def find_infospace_config(start: Optional[Path] = None) -> Optional[Path]:
"""Walk up from *start* looking for ``infospace.yaml``.
Returns the path to the config file, or ``None``.
"""
current = (start or Path.cwd()).resolve()
for directory in [current, *current.parents]:
candidate = directory / "infospace.yaml"
if candidate.is_file():
return candidate
return None