feat(infospace): add schema compliance validator (S1.2)

Deterministic validation of EntityMeta against declarative schemas:
section presence/word counts, heading format, domain enum values.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-19 00:48:57 +01:00
parent 03c6c5e8de
commit 9031e1162c
4 changed files with 850 additions and 0 deletions

View File

@@ -7,9 +7,35 @@ files and analysing infospace collections.
from .models import EntityMeta
from .entity_parser import parse_entity_file, parse_entity_directory
from .schema import (
ECONOMIC_ENTITY_SCHEMA,
EntitySchema,
EnumConstraint,
SectionRequirement,
SectionRule,
)
from .validator import (
BatchComplianceResult,
ComplianceDiagnostic,
ComplianceResult,
validate_entities,
validate_entity,
)
__all__ = [
"EntityMeta",
"parse_entity_file",
"parse_entity_directory",
# Schema
"ECONOMIC_ENTITY_SCHEMA",
"EntitySchema",
"EnumConstraint",
"SectionRequirement",
"SectionRule",
# Validator
"BatchComplianceResult",
"ComplianceDiagnostic",
"ComplianceResult",
"validate_entities",
"validate_entity",
]

View File

@@ -0,0 +1,144 @@
"""
Declarative schema definitions for entity compliance validation.
A schema describes the expected structure of an entity: which sections
are required, word count bounds, heading format, and valid enum values.
Schemas are frozen (immutable once created).
"""
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple
class SectionRequirement(Enum):
"""How strictly a section must be present."""
REQUIRED = "required"
RECOMMENDED = "recommended"
OPTIONAL = "optional"
@dataclass(frozen=True)
class SectionRule:
"""Validation rule for a single H2 section.
Parameters
----------
slug:
Section slug as it appears in entity metadata (e.g. ``definition``).
label:
Human-readable section name for diagnostics.
requirement:
Whether the section is required, recommended, or optional.
min_words:
Minimum word count (inclusive). ``None`` means no lower bound.
max_words:
Maximum word count (inclusive). ``None`` means no upper bound.
"""
slug: str
label: str
requirement: SectionRequirement
min_words: Optional[int] = None
max_words: Optional[int] = None
@dataclass(frozen=True)
class EnumConstraint:
"""Constraint limiting a field to a set of allowed values.
Parameters
----------
field_name:
The ``EntityMeta`` field to check (e.g. ``domain``).
allowed_values:
Tuple of acceptable string values.
severity:
``"error"`` or ``"warning"`` when the value is not in the set.
"""
field_name: str
allowed_values: Tuple[str, ...]
severity: str = "warning"
@dataclass(frozen=True)
class EntitySchema:
"""Complete validation schema for an entity type.
Parameters
----------
name:
Human-readable schema name (e.g. ``"Economic Entity"``).
section_rules:
Tuple of :class:`SectionRule` objects.
enum_constraints:
Tuple of :class:`EnumConstraint` objects.
h1_title_case_severity:
Severity for non-title-case H1 headings (``"error"`` or ``"warning"``).
require_h1:
Whether a non-empty slug (H1) is required.
"""
name: str
section_rules: Tuple[SectionRule, ...]
enum_constraints: Tuple[EnumConstraint, ...] = ()
h1_title_case_severity: str = "warning"
require_h1: bool = True
# ── Default schema for the economic-entity infospace ──────────────
ECONOMIC_ENTITY_SCHEMA = EntitySchema(
name="Economic Entity",
section_rules=(
SectionRule(
slug="definition",
label="Definition",
requirement=SectionRequirement.REQUIRED,
min_words=20,
max_words=150,
),
SectionRule(
slug="source_chapter",
label="Source Chapter",
requirement=SectionRequirement.REQUIRED,
),
SectionRule(
slug="context",
label="Context",
requirement=SectionRequirement.REQUIRED,
),
SectionRule(
slug="economic_domain",
label="Economic Domain",
requirement=SectionRequirement.REQUIRED,
),
SectionRule(
slug="smith_s_original_wording",
label="Smith's Original Wording",
requirement=SectionRequirement.OPTIONAL,
),
SectionRule(
slug="modern_interpretation",
label="Modern Interpretation",
requirement=SectionRequirement.OPTIONAL,
),
),
enum_constraints=(
EnumConstraint(
field_name="domain",
allowed_values=(
"Production",
"Exchange",
"Distribution",
"Regulation",
"General Theory",
),
severity="warning",
),
),
h1_title_case_severity="warning",
require_h1=True,
)

View File

@@ -0,0 +1,261 @@
"""
Schema compliance validator for entity metadata.
Validates :class:`~markitect.infospace.models.EntityMeta` instances
against a declarative :class:`~markitect.infospace.schema.EntitySchema`.
All checks are deterministic — no LLM calls.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Sequence
from .models import EntityMeta
from .schema import EntitySchema, SectionRequirement
# Maps section slugs (as they appear in the schema) to EntityMeta field
# names. Most match directly; ``economic_domain`` maps to ``domain``.
_SECTION_FIELD_MAP: Dict[str, str] = {
"definition": "definition",
"source_chapter": "source_chapter",
"context": "context",
"economic_domain": "domain",
"smith_s_original_wording": "original_wording",
"modern_interpretation": "modern_interpretation",
}
@dataclass
class ComplianceDiagnostic:
"""A single validation finding."""
code: str
message: str
severity: str # "error" or "warning"
section: Optional[str] = None
field: Optional[str] = None
def __str__(self) -> str:
parts = [f"[{self.severity.upper()}] {self.code}: {self.message}"]
if self.section:
parts.append(f"(section: {self.section})")
if self.field:
parts.append(f"(field: {self.field})")
return " ".join(parts)
@dataclass
class ComplianceResult:
"""Validation result for a single entity."""
entity_slug: str
schema_name: str
diagnostics: List[ComplianceDiagnostic] = field(default_factory=list)
checks_run: int = 0
@property
def is_compliant(self) -> bool:
return self.error_count == 0
@property
def error_count(self) -> int:
return sum(1 for d in self.diagnostics if d.severity == "error")
@property
def warning_count(self) -> int:
return sum(1 for d in self.diagnostics if d.severity == "warning")
@property
def errors(self) -> List[ComplianceDiagnostic]:
return [d for d in self.diagnostics if d.severity == "error"]
@property
def warnings(self) -> List[ComplianceDiagnostic]:
return [d for d in self.diagnostics if d.severity == "warning"]
def summary(self) -> str:
status = "PASS" if self.is_compliant else "FAIL"
return (
f"{self.entity_slug}: {status} "
f"({self.checks_run} checks, "
f"{self.error_count} errors, "
f"{self.warning_count} warnings)"
)
@dataclass
class BatchComplianceResult:
"""Aggregated validation result for multiple entities."""
results: List[ComplianceResult] = field(default_factory=list)
schema_name: str = ""
@property
def total_entities(self) -> int:
return len(self.results)
@property
def compliant_count(self) -> int:
return sum(1 for r in self.results if r.is_compliant)
@property
def non_compliant_count(self) -> int:
return self.total_entities - self.compliant_count
@property
def total_errors(self) -> int:
return sum(r.error_count for r in self.results)
@property
def total_warnings(self) -> int:
return sum(r.warning_count for r in self.results)
def summary(self) -> str:
lines = [
f"Schema: {self.schema_name}",
f"Entities: {self.total_entities}",
f"Compliant: {self.compliant_count}/{self.total_entities}",
f"Errors: {self.total_errors}, Warnings: {self.total_warnings}",
]
for r in self.results:
lines.append(f" {r.summary()}")
return "\n".join(lines)
def _word_count(text: str) -> int:
"""Count whitespace-separated words."""
return len(text.split())
def validate_entity(
entity: EntityMeta,
schema: EntitySchema,
) -> ComplianceResult:
"""Validate a single entity against *schema*.
Returns a :class:`ComplianceResult` with all diagnostics found.
"""
result = ComplianceResult(
entity_slug=entity.slug,
schema_name=schema.name,
)
checks = 0
# ── H1 checks ─────────────────────────────────────────────────
if schema.require_h1:
checks += 1
if not entity.slug:
result.diagnostics.append(
ComplianceDiagnostic(
code="H1_MISSING",
message="Entity has no H1 heading (empty slug).",
severity="error",
)
)
checks += 1
if entity.slug and not entity.h1_is_title_case:
result.diagnostics.append(
ComplianceDiagnostic(
code="H1_NOT_TITLE_CASE",
message=f"H1 '{entity.h1_raw}' is not in title case.",
severity=schema.h1_title_case_severity,
)
)
# ── Section checks ────────────────────────────────────────────
for rule in schema.section_rules:
checks += 1
field_name = _SECTION_FIELD_MAP.get(rule.slug, rule.slug)
value = getattr(entity, field_name, "")
is_empty = not value or not value.strip()
if is_empty:
if rule.requirement == SectionRequirement.REQUIRED:
result.diagnostics.append(
ComplianceDiagnostic(
code="SECTION_MISSING",
message=f"Required section '{rule.label}' is missing or empty.",
severity="error",
section=rule.slug,
)
)
elif rule.requirement == SectionRequirement.RECOMMENDED:
result.diagnostics.append(
ComplianceDiagnostic(
code="SECTION_RECOMMENDED",
message=f"Recommended section '{rule.label}' is missing.",
severity="warning",
section=rule.slug,
)
)
# OPTIONAL + empty → no diagnostic
continue
# Word count bounds (only if section has content)
wc = _word_count(value)
if rule.min_words is not None and wc < rule.min_words:
checks += 1
result.diagnostics.append(
ComplianceDiagnostic(
code="SECTION_TOO_SHORT",
message=(
f"Section '{rule.label}' has {wc} words "
f"(minimum: {rule.min_words})."
),
severity="error",
section=rule.slug,
)
)
elif rule.max_words is not None and wc > rule.max_words:
checks += 1
result.diagnostics.append(
ComplianceDiagnostic(
code="SECTION_TOO_LONG",
message=(
f"Section '{rule.label}' has {wc} words "
f"(maximum: {rule.max_words})."
),
severity="warning",
section=rule.slug,
)
)
# ── Enum constraints ──────────────────────────────────────────
for constraint in schema.enum_constraints:
checks += 1
value = getattr(entity, constraint.field_name, "")
# Empty field is already caught by SECTION_MISSING above
if not value or not value.strip():
continue
if value.strip() not in constraint.allowed_values:
result.diagnostics.append(
ComplianceDiagnostic(
code="ENUM_VALUE_UNKNOWN",
message=(
f"Field '{constraint.field_name}' has value "
f"'{value.strip()}' which is not in the allowed set."
),
severity=constraint.severity,
field=constraint.field_name,
)
)
result.checks_run = checks
return result
def validate_entities(
entities: Sequence[EntityMeta],
schema: EntitySchema,
) -> BatchComplianceResult:
"""Validate multiple entities against *schema*.
Returns a :class:`BatchComplianceResult` with per-entity results.
"""
batch = BatchComplianceResult(schema_name=schema.name)
for entity in entities:
batch.results.append(validate_entity(entity, schema))
return batch