Files
tegwick 9031e1162c feat(infospace): add schema compliance validator (S1.2)
Deterministic validation of EntityMeta against declarative schemas:
section presence/word counts, heading format, domain enum values.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 00:48:57 +01:00

262 lines
8.5 KiB
Python

"""
Schema compliance validator for entity metadata.
Validates :class:`~markitect.infospace.models.EntityMeta` instances
against a declarative :class:`~markitect.infospace.schema.EntitySchema`.
All checks are deterministic — no LLM calls.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Sequence
from .models import EntityMeta
from .schema import EntitySchema, SectionRequirement
# Maps section slugs (as they appear in the schema) to EntityMeta field
# names. Most match directly; ``economic_domain`` maps to ``domain``.
_SECTION_FIELD_MAP: Dict[str, str] = {
"definition": "definition",
"source_chapter": "source_chapter",
"context": "context",
"economic_domain": "domain",
"smith_s_original_wording": "original_wording",
"modern_interpretation": "modern_interpretation",
}
@dataclass
class ComplianceDiagnostic:
"""A single validation finding."""
code: str
message: str
severity: str # "error" or "warning"
section: Optional[str] = None
field: Optional[str] = None
def __str__(self) -> str:
parts = [f"[{self.severity.upper()}] {self.code}: {self.message}"]
if self.section:
parts.append(f"(section: {self.section})")
if self.field:
parts.append(f"(field: {self.field})")
return " ".join(parts)
@dataclass
class ComplianceResult:
"""Validation result for a single entity."""
entity_slug: str
schema_name: str
diagnostics: List[ComplianceDiagnostic] = field(default_factory=list)
checks_run: int = 0
@property
def is_compliant(self) -> bool:
return self.error_count == 0
@property
def error_count(self) -> int:
return sum(1 for d in self.diagnostics if d.severity == "error")
@property
def warning_count(self) -> int:
return sum(1 for d in self.diagnostics if d.severity == "warning")
@property
def errors(self) -> List[ComplianceDiagnostic]:
return [d for d in self.diagnostics if d.severity == "error"]
@property
def warnings(self) -> List[ComplianceDiagnostic]:
return [d for d in self.diagnostics if d.severity == "warning"]
def summary(self) -> str:
status = "PASS" if self.is_compliant else "FAIL"
return (
f"{self.entity_slug}: {status} "
f"({self.checks_run} checks, "
f"{self.error_count} errors, "
f"{self.warning_count} warnings)"
)
@dataclass
class BatchComplianceResult:
"""Aggregated validation result for multiple entities."""
results: List[ComplianceResult] = field(default_factory=list)
schema_name: str = ""
@property
def total_entities(self) -> int:
return len(self.results)
@property
def compliant_count(self) -> int:
return sum(1 for r in self.results if r.is_compliant)
@property
def non_compliant_count(self) -> int:
return self.total_entities - self.compliant_count
@property
def total_errors(self) -> int:
return sum(r.error_count for r in self.results)
@property
def total_warnings(self) -> int:
return sum(r.warning_count for r in self.results)
def summary(self) -> str:
lines = [
f"Schema: {self.schema_name}",
f"Entities: {self.total_entities}",
f"Compliant: {self.compliant_count}/{self.total_entities}",
f"Errors: {self.total_errors}, Warnings: {self.total_warnings}",
]
for r in self.results:
lines.append(f" {r.summary()}")
return "\n".join(lines)
def _word_count(text: str) -> int:
"""Count whitespace-separated words."""
return len(text.split())
def validate_entity(
entity: EntityMeta,
schema: EntitySchema,
) -> ComplianceResult:
"""Validate a single entity against *schema*.
Returns a :class:`ComplianceResult` with all diagnostics found.
"""
result = ComplianceResult(
entity_slug=entity.slug,
schema_name=schema.name,
)
checks = 0
# ── H1 checks ─────────────────────────────────────────────────
if schema.require_h1:
checks += 1
if not entity.slug:
result.diagnostics.append(
ComplianceDiagnostic(
code="H1_MISSING",
message="Entity has no H1 heading (empty slug).",
severity="error",
)
)
checks += 1
if entity.slug and not entity.h1_is_title_case:
result.diagnostics.append(
ComplianceDiagnostic(
code="H1_NOT_TITLE_CASE",
message=f"H1 '{entity.h1_raw}' is not in title case.",
severity=schema.h1_title_case_severity,
)
)
# ── Section checks ────────────────────────────────────────────
for rule in schema.section_rules:
checks += 1
field_name = _SECTION_FIELD_MAP.get(rule.slug, rule.slug)
value = getattr(entity, field_name, "")
is_empty = not value or not value.strip()
if is_empty:
if rule.requirement == SectionRequirement.REQUIRED:
result.diagnostics.append(
ComplianceDiagnostic(
code="SECTION_MISSING",
message=f"Required section '{rule.label}' is missing or empty.",
severity="error",
section=rule.slug,
)
)
elif rule.requirement == SectionRequirement.RECOMMENDED:
result.diagnostics.append(
ComplianceDiagnostic(
code="SECTION_RECOMMENDED",
message=f"Recommended section '{rule.label}' is missing.",
severity="warning",
section=rule.slug,
)
)
# OPTIONAL + empty → no diagnostic
continue
# Word count bounds (only if section has content)
wc = _word_count(value)
if rule.min_words is not None and wc < rule.min_words:
checks += 1
result.diagnostics.append(
ComplianceDiagnostic(
code="SECTION_TOO_SHORT",
message=(
f"Section '{rule.label}' has {wc} words "
f"(minimum: {rule.min_words})."
),
severity="error",
section=rule.slug,
)
)
elif rule.max_words is not None and wc > rule.max_words:
checks += 1
result.diagnostics.append(
ComplianceDiagnostic(
code="SECTION_TOO_LONG",
message=(
f"Section '{rule.label}' has {wc} words "
f"(maximum: {rule.max_words})."
),
severity="warning",
section=rule.slug,
)
)
# ── Enum constraints ──────────────────────────────────────────
for constraint in schema.enum_constraints:
checks += 1
value = getattr(entity, constraint.field_name, "")
# Empty field is already caught by SECTION_MISSING above
if not value or not value.strip():
continue
if value.strip() not in constraint.allowed_values:
result.diagnostics.append(
ComplianceDiagnostic(
code="ENUM_VALUE_UNKNOWN",
message=(
f"Field '{constraint.field_name}' has value "
f"'{value.strip()}' which is not in the allowed set."
),
severity=constraint.severity,
field=constraint.field_name,
)
)
result.checks_run = checks
return result
def validate_entities(
entities: Sequence[EntityMeta],
schema: EntitySchema,
) -> BatchComplianceResult:
"""Validate multiple entities against *schema*.
Returns a :class:`BatchComplianceResult` with per-entity results.
"""
batch = BatchComplianceResult(schema_name=schema.name)
for entity in entities:
batch.results.append(validate_entity(entity, schema))
return batch