Files
markitect-main/markitect/infospace/config.py
tegwick 81a4c8796a feat(infospace): add L2 entity classification with type × VSM matrix (S2.9)
Implements the L2 typed-entities layer — each entity is assigned an
Entity Type (Element, Process, Relation, Principle, Institution) and a
VSM System (S1–S5) by an LLM, with one-sentence rationales for each.

New modules:
- markitect/infospace/classification.py — EntityClassification dataclass
  + ENTITY_TYPES / VSM_SYSTEMS controlled vocabularies
- markitect/infospace/classification_io.py — write/read classification
  files (YAML frontmatter + markdown body, mirrors evaluation_io)
- markitect/infospace/classifier.py — build_classification_prompt(),
  parse_classification_response(), run_entity_classification(); batch
  runner writes files incrementally (same resumable pattern as evaluate)

CLI: markitect infospace classify [--entity SLUG] [--provider P] [--model M]
  - Incremental skip: checks output/classifications/ for existing files
  - Defaults to openrouter provider; 2000 max_tokens (Gemini 2.5 Flash
    uses ~787 thinking tokens, so 800 was too low)

CLI: markitect infospace classify-summary [--update-metrics]
  - Entity type counts + VSM system counts with percentages
  - 5 × 6 type × VSM matrix (spots structural blind spots at a glance)
  - --update-metrics writes type_distribution, type_entropy,
    vsm_type_matrix_cells to metrics.yaml

Config: InfospaceConfig gains classifications_dir (default output/classifications)
Schema: schemas/typed-entity-schema-v1.0.md — type/VSM vocabulary tables,
  rationale format rules, validation rules, metrics enabled at L2
infospace.yaml: schemas.typed_entity references typed-entity-schema-v1.0.md

Seed classifications (3): division_of_labour (Process/S1),
  natural_price_as_central_price (Principle/S2),
  invisible_hand_mechanism (Principle/S4)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-23 09:35:58 +01:00

357 lines
11 KiB
Python

"""
Infospace configuration model and YAML loader.
An infospace is declared via an ``infospace.yaml`` file that specifies
its topic, disciplines, schemas, competency questions, and viability
thresholds. This module provides the data models and I/O for that
configuration.
Example ``infospace.yaml``::
topic:
name: "The Wealth of Nations"
domain: "Classical Economics"
sources: artifacts/sources/
disciplines:
- name: "Viable System Model"
path: artifacts/vsm-reference/
schemas:
entity: schemas/economic-entity-schema-v1.0.md
competency_questions: schemas/competency-questions.md
viability:
coverage_ratio: { min: 0.60 }
per_entity_mean: { min: 3.5 }
"""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
@dataclass
class TopicConfig:
"""The subject matter an infospace explains.
Attributes:
name: Human-readable topic name.
domain: Broader knowledge domain.
sources: Path (relative to infospace root) to source material.
"""
name: str
domain: str = ""
sources: str = ""
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {"name": self.name}
if self.domain:
d["domain"] = self.domain
if self.sources:
d["sources"] = self.sources
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> TopicConfig:
return cls(
name=data["name"],
domain=data.get("domain", ""),
sources=data.get("sources", ""),
)
@dataclass
class DisciplineBinding:
"""An external infospace applied as an analytical lens.
Attributes:
name: Human-readable discipline name.
path: Path to the discipline infospace (relative to root).
"""
name: str
path: str = ""
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {"name": self.name}
if self.path:
d["path"] = self.path
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> DisciplineBinding:
return cls(name=data["name"], path=data.get("path", ""))
@dataclass
class SchemaRegistry:
"""Schema paths governing entity and document structure.
All paths are relative to the infospace root directory.
"""
entity: str = ""
mapping: str = ""
analysis: str = ""
extra: Dict[str, str] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {}
if self.entity:
d["entity"] = self.entity
if self.mapping:
d["mapping"] = self.mapping
if self.analysis:
d["analysis"] = self.analysis
d.update(self.extra)
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> SchemaRegistry:
known = {"entity", "mapping", "analysis"}
extra = {k: v for k, v in data.items() if k not in known}
return cls(
entity=data.get("entity", ""),
mapping=data.get("mapping", ""),
analysis=data.get("analysis", ""),
extra=extra,
)
@dataclass
class ViabilityThreshold:
"""Threshold for a single viability metric.
At least one of *min* or *max* should be set.
"""
metric: str
min: Optional[float] = None
max: Optional[float] = None
def check(self, value: float) -> bool:
"""Return ``True`` if *value* is within the threshold."""
if self.min is not None and value < self.min:
return False
if self.max is not None and value > self.max:
return False
return True
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {}
if self.min is not None:
d["min"] = self.min
if self.max is not None:
d["max"] = self.max
return d
@dataclass
class PipelineStage:
"""A single stage in the processing pipeline.
Attributes:
template: Path to the template file (relative to infospace root).
name: Human-readable stage name used in progress output.
output_dir: Directory for stage outputs (relative to root).
output_macro: Macro name for this stage's output, also used as
the filename suffix (e.g. ``entities`` → ``<id>-entities.md``).
split_entities: If True, parse ``--- ENTITY: <name> ---`` delimiters
from LLM output and write individual entity files.
macros: Static macros loaded from files (macro name → relative path).
spaces: Legacy space IDs for SQLite-based resolver (unused by
:class:`SourcePipeline`).
max_tokens: Maximum tokens to request from the LLM for this stage.
Overrides the pipeline-level default (4096).
"""
template: str
name: str = ""
output_dir: str = ""
output_macro: str = ""
split_entities: bool = False
macros: Dict[str, str] = field(default_factory=dict)
spaces: List[str] = field(default_factory=list)
max_tokens: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {"template": self.template}
if self.name:
d["name"] = self.name
if self.output_dir:
d["output_dir"] = self.output_dir
if self.output_macro:
d["output_macro"] = self.output_macro
if self.split_entities:
d["split_entities"] = self.split_entities
if self.macros:
d["macros"] = self.macros
if self.spaces:
d["spaces"] = self.spaces
if self.max_tokens is not None:
d["max_tokens"] = self.max_tokens
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> PipelineStage:
return cls(
template=data["template"],
name=data.get("name", ""),
output_dir=data.get("output_dir", ""),
output_macro=data.get("output_macro", ""),
split_entities=data.get("split_entities", False),
macros=data.get("macros", {}),
spaces=data.get("spaces", []),
max_tokens=data.get("max_tokens"),
)
@dataclass
class PipelineConfig:
"""Processing pipeline configuration."""
stages: List[PipelineStage] = field(default_factory=list)
post_batch: List[PipelineStage] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {}
if self.stages:
d["stages"] = [s.to_dict() for s in self.stages]
if self.post_batch:
d["post_batch"] = [s.to_dict() for s in self.post_batch]
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> PipelineConfig:
return cls(
stages=[PipelineStage.from_dict(s) for s in data.get("stages", [])],
post_batch=[PipelineStage.from_dict(s) for s in data.get("post_batch", [])],
)
@dataclass
class InfospaceConfig:
"""Complete infospace configuration, loaded from ``infospace.yaml``.
This is the declarative description of an infospace: what it
explains, through which lenses, governed by which schemas, and
what quality thresholds it must meet.
"""
topic: TopicConfig
disciplines: List[DisciplineBinding] = field(default_factory=list)
schemas: SchemaRegistry = field(default_factory=SchemaRegistry)
competency_questions: str = ""
viability: Dict[str, ViabilityThreshold] = field(default_factory=dict)
pipeline: Optional[PipelineConfig] = None
entities_dir: str = "output/entities"
evaluations_dir: str = "output/evaluations"
classifications_dir: str = "output/classifications"
metrics_dir: str = "output/metrics"
relations_dir: str = "output/relations"
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {"topic": self.topic.to_dict()}
if self.disciplines:
d["disciplines"] = [db.to_dict() for db in self.disciplines]
schemas_dict = self.schemas.to_dict()
if schemas_dict:
d["schemas"] = schemas_dict
if self.competency_questions:
d["competency_questions"] = self.competency_questions
if self.viability:
d["viability"] = {
name: t.to_dict() for name, t in self.viability.items()
}
if self.pipeline:
d["pipeline"] = self.pipeline.to_dict()
if self.entities_dir != "output/entities":
d["entities_dir"] = self.entities_dir
if self.evaluations_dir != "output/evaluations":
d["evaluations_dir"] = self.evaluations_dir
if self.classifications_dir != "output/classifications":
d["classifications_dir"] = self.classifications_dir
if self.metrics_dir != "output/metrics":
d["metrics_dir"] = self.metrics_dir
if self.relations_dir != "output/relations":
d["relations_dir"] = self.relations_dir
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> InfospaceConfig:
viability_raw = data.get("viability", {})
viability = {
name: ViabilityThreshold(metric=name, **bounds)
for name, bounds in viability_raw.items()
}
pipeline_raw = data.get("pipeline")
pipeline = PipelineConfig.from_dict(pipeline_raw) if pipeline_raw else None
return cls(
topic=TopicConfig.from_dict(data["topic"]),
disciplines=[
DisciplineBinding.from_dict(d)
for d in data.get("disciplines", [])
],
schemas=SchemaRegistry.from_dict(data.get("schemas", {})),
competency_questions=data.get("competency_questions", ""),
viability=viability,
pipeline=pipeline,
entities_dir=data.get("entities_dir", "output/entities"),
evaluations_dir=data.get("evaluations_dir", "output/evaluations"),
classifications_dir=data.get("classifications_dir", "output/classifications"),
metrics_dir=data.get("metrics_dir", "output/metrics"),
relations_dir=data.get("relations_dir", "output/relations"),
)
def load_infospace_config(path: Path) -> InfospaceConfig:
"""Load an :class:`InfospaceConfig` from a YAML file.
Args:
path: Path to ``infospace.yaml``.
Raises:
FileNotFoundError: If *path* does not exist.
ValueError: If required fields are missing.
"""
data = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError(f"Expected a YAML mapping in {path}")
if "topic" not in data:
raise ValueError(f"Missing required 'topic' key in {path}")
return InfospaceConfig.from_dict(data)
def save_infospace_config(config: InfospaceConfig, path: Path) -> None:
"""Write an :class:`InfospaceConfig` to a YAML file."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
yaml.safe_dump(
config.to_dict(),
default_flow_style=False,
sort_keys=False,
),
encoding="utf-8",
)
def find_infospace_config(start: Optional[Path] = None) -> Optional[Path]:
"""Walk up from *start* looking for ``infospace.yaml``.
Returns the path to the config file, or ``None``.
"""
current = (start or Path.cwd()).resolve()
for directory in [current, *current.parents]:
candidate = directory / "infospace.yaml"
if candidate.is_file():
return candidate
return None