Contract framework with markdown-native contracts utilizing fenced YAML blocks

This commit is contained in:
2026-05-03 22:51:13 +02:00
parent 3cfda33bc9
commit e3e13ee45a
36 changed files with 2877 additions and 13 deletions

View File

@@ -9,6 +9,18 @@ from markitect_tool.core import (
parse_markdown,
parse_markdown_file,
)
from markitect_tool.contract import (
ContractCheckResult,
ContractValidationResult,
DocumentContract,
check_document_contract,
check_markdown_file,
collect_metrics,
load_contract_file,
validate_contract,
validate_contract_file,
)
from markitect_tool.diagnostics import Diagnostic, SourceLocation
from markitect_tool.schema import (
MarkdownSchema,
SchemaValidationResult,
@@ -32,4 +44,15 @@ __all__ = [
"load_schema_file",
"validate_document",
"validate_markdown_file",
"ContractCheckResult",
"ContractValidationResult",
"DocumentContract",
"check_document_contract",
"check_markdown_file",
"collect_metrics",
"load_contract_file",
"validate_contract",
"validate_contract_file",
"Diagnostic",
"SourceLocation",
]

View File

@@ -9,6 +9,13 @@ import click
import yaml
from markitect_tool.core import parse_markdown_file
from markitect_tool.contract import (
ContractLoaderError,
check_markdown_file,
collect_metrics,
load_contract_file,
validate_contract,
)
from markitect_tool.schema import load_schema_file, validate_markdown_file, validate_schema
@@ -41,6 +48,23 @@ def parse(file: Path, output_format: str) -> None:
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def metrics(file: Path, output_format: str) -> None:
"""Report practical size and complexity metrics for a Markdown file."""
document = parse_markdown_file(file)
data = collect_metrics(document).to_dict() | {"document_path": str(file)}
_emit_metrics(data, output_format)
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
@@ -88,6 +112,54 @@ def schema_validate(schema_file: Path, output_format: str) -> None:
raise click.exceptions.Exit(0 if result.valid else 1)
@main.group()
def contract() -> None:
"""Work with Markdown document contracts."""
@contract.command("validate")
@click.argument("contract_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def contract_validate(contract_file: Path, output_format: str) -> None:
"""Validate that a Markdown contract file is well formed."""
result = validate_contract(load_contract_file(contract_file))
_emit_diagnostic_result(result.to_dict(), output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@contract.command("check")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--contract",
"contract_file",
required=True,
type=click.Path(exists=True, dir_okay=False, path_type=Path),
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def contract_check(file: Path, contract_file: Path, output_format: str) -> None:
"""Check a Markdown file against a Markdown document contract."""
try:
result = check_markdown_file(file, contract_file)
except ContractLoaderError as exc:
raise click.ClickException(str(exc)) from exc
_emit_diagnostic_result(result.to_dict(), output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
def _emit_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
@@ -102,5 +174,45 @@ def _emit_result(data: dict, output_format: str) -> None:
click.echo(f"- {violation['path']}: {violation['message']}")
def _emit_diagnostic_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo("valid" if data.get("valid") else "invalid")
for diagnostic in data.get("diagnostics", []):
click.echo(
f"- [{diagnostic['severity']}] {diagnostic['code']}: "
f"{diagnostic['message']}"
)
if diagnostic.get("source"):
source = diagnostic["source"]
suffix = f":{source['line']}" if source.get("line") else ""
click.echo(f" source: {source.get('path', '<document>')}{suffix}")
if diagnostic.get("guidance"):
click.echo(f" guidance: {diagnostic['guidance']}")
def _emit_metrics(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
doc = data["document"]
click.echo("document")
for metric, value in doc.items():
click.echo(f"- {metric}: {value}")
sections = data.get("sections", [])
if sections:
click.echo("sections")
for section in sections:
click.echo(
f"- {section['heading']}: words={section['words']}, "
f"paragraphs={section['paragraphs']}, line={section['line']}"
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,47 @@
"""Document contract loading, metrics, and validation."""
from markitect_tool.contract.checker import (
ContractCheckResult,
ContractValidationResult,
check_document_contract,
check_markdown_file,
validate_contract,
validate_contract_file,
)
from markitect_tool.contract.loader import (
ContractLoaderError,
ContractNotFoundError,
InvalidContractFormatError,
load_contract_file,
load_contract_text,
)
from markitect_tool.contract.metrics import DocumentMetrics, SectionMetrics, collect_metrics
from markitect_tool.contract.model import (
AssertionSpec,
DocumentContract,
FieldSpec,
MetricBand,
SectionSpec,
)
__all__ = [
"AssertionSpec",
"ContractCheckResult",
"ContractLoaderError",
"ContractNotFoundError",
"ContractValidationResult",
"DocumentContract",
"DocumentMetrics",
"FieldSpec",
"InvalidContractFormatError",
"MetricBand",
"SectionMetrics",
"SectionSpec",
"check_document_contract",
"check_markdown_file",
"collect_metrics",
"load_contract_file",
"load_contract_text",
"validate_contract",
"validate_contract_file",
]

View File

@@ -0,0 +1,945 @@
"""Validate contracts and check Markdown documents against them."""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from markitect_tool.contract.loader import load_contract_file
from markitect_tool.contract.metrics import DocumentMetrics, SectionMetrics, collect_metrics
from markitect_tool.contract.model import (
FIELD_TYPES,
METRIC_NAMES,
PRESENCE_VALUES,
AssertionSpec,
DocumentContract,
FieldSpec,
MetricBand,
SectionSpec,
normalize_metric_name,
)
from markitect_tool.core import Document, Section, parse_markdown_file
from markitect_tool.diagnostics import (
Diagnostic,
SourceLocation,
has_error,
valid_severity,
)
@dataclass(frozen=True)
class ContractValidationResult:
"""Validation result for a contract definition."""
valid: bool
diagnostics: list[Diagnostic]
contract_path: str | None = None
def to_dict(self) -> dict[str, Any]:
data = {
"valid": self.valid,
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
"contract_path": self.contract_path,
}
return {key: value for key, value in data.items() if value is not None}
@dataclass(frozen=True)
class ContractCheckResult:
"""Check result for one document and one contract."""
valid: bool
diagnostics: list[Diagnostic]
document_path: str | None = None
contract_path: str | None = None
metrics: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
data = {
"valid": self.valid,
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
"document_path": self.document_path,
"contract_path": self.contract_path,
"metrics": self.metrics or None,
}
return {key: value for key, value in data.items() if value is not None}
def validate_contract_file(contract_path: str | Path) -> ContractValidationResult:
"""Load and validate a Markdown contract file."""
return validate_contract(load_contract_file(contract_path))
def validate_contract(contract: DocumentContract) -> ContractValidationResult:
"""Validate the contract definition itself."""
diagnostics: list[Diagnostic] = []
contract_location = _contract_location(contract)
if not contract.id:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.id.missing",
message="Contract must declare an id.",
contract=contract_location,
guidance="Add `id` to the contract YAML block or frontmatter.",
)
)
if not contract.document_type:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.document_type.missing",
message="Contract must declare the document type it governs.",
contract=contract_location,
guidance="Add `document.type` or `document_type` to the contract.",
)
)
section_ids: set[str] = set()
for section in contract.sections:
diagnostics.extend(_validate_section_spec(section, contract))
if section.id:
if section.id in section_ids:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.section.id.duplicate",
message=f"Section id `{section.id}` is declared more than once.",
contract=contract_location,
rule_id=section.id,
)
)
section_ids.add(section.id)
for field_spec in contract.fields:
diagnostics.extend(_validate_field_spec(field_spec, contract))
for band in contract.metrics:
diagnostics.extend(_validate_metric_band(band, contract, rule_id=band.rule_id))
for assertion in contract.assertions:
diagnostics.extend(_validate_assertion(assertion, contract))
return ContractValidationResult(
valid=not has_error(diagnostics),
diagnostics=diagnostics,
contract_path=contract.source_path,
)
def check_markdown_file(
markdown_path: str | Path, contract_path: str | Path
) -> ContractCheckResult:
"""Parse and check a Markdown file against a contract file."""
document = parse_markdown_file(markdown_path)
contract = load_contract_file(contract_path)
return check_document_contract(document, contract)
def check_document_contract(
document: Document, contract: DocumentContract
) -> ContractCheckResult:
"""Check a parsed Markdown document against a document contract."""
contract_validation = validate_contract(contract)
document_metrics = collect_metrics(document)
diagnostics = list(contract_validation.diagnostics)
if contract_validation.valid:
diagnostics.extend(_check_document_type(document, contract))
diagnostics.extend(_check_fields(document, contract))
diagnostics.extend(_check_document_metrics(document, contract, document_metrics))
diagnostics.extend(_check_assertions(document.body, contract.assertions, document, contract))
diagnostics.extend(_check_sections(document, contract, document_metrics))
return ContractCheckResult(
valid=not has_error(diagnostics),
diagnostics=diagnostics,
document_path=document.source_path,
contract_path=contract.source_path,
metrics=document_metrics.to_dict(),
)
def _validate_section_spec(
section: SectionSpec, contract: DocumentContract
) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
contract_location = _contract_location(contract)
if not section.id:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.section.id.missing",
message="Every section specification must declare an id.",
contract=contract_location,
)
)
if section.presence not in PRESENCE_VALUES:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.section.presence.invalid",
message=(
f"Section `{section.id or '<missing>'}` uses unsupported presence "
f"`{section.presence}`."
),
contract=contract_location,
rule_id=section.id,
)
)
if section.level is not None and not isinstance(section.level, int):
diagnostics.append(
Diagnostic(
severity="error",
code="contract.section.level.invalid",
message=f"Section `{section.id}` level must be an integer.",
contract=contract_location,
rule_id=section.id,
)
)
for band in section.metrics:
diagnostics.extend(_validate_metric_band(band, contract, rule_id=section.id))
for assertion in section.assertions:
diagnostics.extend(_validate_assertion(assertion, contract))
return diagnostics
def _validate_field_spec(field_spec: FieldSpec, contract: DocumentContract) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
contract_location = _contract_location(contract)
if not field_spec.id:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.field.id.missing",
message="Every field specification must declare an id.",
contract=contract_location,
)
)
if field_spec.type and field_spec.type not in FIELD_TYPES:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.field.type.invalid",
message=f"Field `{field_spec.id}` uses unsupported type `{field_spec.type}`.",
contract=contract_location,
rule_id=field_spec.id,
)
)
if field_spec.pattern:
diagnostics.extend(_validate_regex(field_spec.pattern, contract, field_spec.id))
return diagnostics
def _validate_metric_band(
band: MetricBand, contract: DocumentContract, rule_id: str | None = None
) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
contract_location = _contract_location(contract)
if not isinstance(band.raw, dict):
diagnostics.append(
Diagnostic(
severity="error",
code="contract.metric.band.invalid",
message=f"Metric `{band.metric}` band must be a mapping.",
contract=contract_location,
rule_id=rule_id,
)
)
return diagnostics
if band.metric not in METRIC_NAMES:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.metric.unknown",
message=f"Unsupported metric `{band.metric}`.",
contract=contract_location,
rule_id=rule_id,
)
)
for severity in {band.severity, band.min_severity, band.max_severity}:
if severity is not None and not valid_severity(severity):
diagnostics.append(
Diagnostic(
severity="error",
code="contract.severity.invalid",
message=f"Unsupported severity `{severity}` for metric `{band.metric}`.",
contract=contract_location,
rule_id=rule_id,
)
)
if band.min is None and band.max is None:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.metric.band.empty",
message=f"Metric `{band.metric}` needs at least one of min or max.",
contract=contract_location,
rule_id=rule_id,
)
)
if band.min is not None and not isinstance(band.min, int | float):
diagnostics.append(
Diagnostic(
severity="error",
code="contract.metric.min.invalid",
message=f"Metric `{band.metric}` min must be numeric.",
contract=contract_location,
rule_id=rule_id,
)
)
if band.max is not None and not isinstance(band.max, int | float):
diagnostics.append(
Diagnostic(
severity="error",
code="contract.metric.max.invalid",
message=f"Metric `{band.metric}` max must be numeric.",
contract=contract_location,
rule_id=rule_id,
)
)
if (
isinstance(band.min, int | float)
and isinstance(band.max, int | float)
and band.min > band.max
):
diagnostics.append(
Diagnostic(
severity="error",
code="contract.metric.band.inverted",
message=f"Metric `{band.metric}` min cannot be greater than max.",
contract=contract_location,
rule_id=rule_id,
)
)
return diagnostics
def _validate_assertion(
assertion: AssertionSpec, contract: DocumentContract
) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
contract_location = _contract_location(contract)
if not valid_severity(assertion.severity):
diagnostics.append(
Diagnostic(
severity="error",
code="contract.severity.invalid",
message=f"Unsupported assertion severity `{assertion.severity}`.",
contract=contract_location,
rule_id=assertion.id,
)
)
if not any(
[
assertion.contains,
assertion.contains_any,
assertion.not_contains,
assertion.matches,
assertion.not_matches,
]
):
diagnostics.append(
Diagnostic(
severity="error",
code="contract.assertion.empty",
message="Assertion needs at least one deterministic condition.",
contract=contract_location,
rule_id=assertion.id,
)
)
for pattern in assertion.matches + assertion.not_matches:
diagnostics.extend(_validate_regex(pattern, contract, assertion.id))
return diagnostics
def _validate_regex(
pattern: str, contract: DocumentContract, rule_id: str | None
) -> list[Diagnostic]:
try:
re.compile(pattern)
except re.error as exc:
return [
Diagnostic(
severity="error",
code="contract.regex.invalid",
message=f"Invalid regular expression `{pattern}`: {exc}",
contract=_contract_location(contract),
rule_id=rule_id,
)
]
return []
def _check_document_type(document: Document, contract: DocumentContract) -> list[Diagnostic]:
declared = (
document.frontmatter.get("document_type")
or document.frontmatter.get("document-type")
or document.frontmatter.get("type")
)
if not declared or not contract.document_type or str(declared) == contract.document_type:
return []
return [
Diagnostic(
severity="error",
code="contract.document_type.mismatch",
message=(
f"Document declares type `{declared}`, but contract expects "
f"`{contract.document_type}`."
),
source=SourceLocation(path=document.source_path, line=1),
contract=_contract_location(contract),
rule_id=contract.id,
guidance="Use the matching contract or update the document frontmatter type.",
)
]
def _check_fields(document: Document, contract: DocumentContract) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
document_data = document.to_dict()
for field_spec in contract.fields:
value, exists = _resolve_path(document_data, field_spec.path or "")
field_location = SourceLocation(path=document.source_path, line=1)
if field_spec.required and not exists:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.field.missing",
message=f"Required field `{field_spec.id}` is missing.",
source=field_location,
contract=_contract_location(contract),
rule_id=field_spec.id,
guidance=f"Provide `{field_spec.path}` in the document or context.",
)
)
continue
if not exists:
continue
diagnostics.extend(_check_field_value(field_spec, value, field_location, contract))
return diagnostics
def _check_field_value(
field_spec: FieldSpec,
value: Any,
field_location: SourceLocation,
contract: DocumentContract,
) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
if field_spec.type and not _value_matches_type(value, field_spec.type):
diagnostics.append(
Diagnostic(
severity="error",
code="contract.field.type_mismatch",
message=(
f"Field `{field_spec.id}` must be `{field_spec.type}`, "
f"got `{type(value).__name__}`."
),
source=field_location,
contract=_contract_location(contract),
rule_id=field_spec.id,
)
)
if field_spec.enum is not None and value not in field_spec.enum:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.field.enum",
message=f"Field `{field_spec.id}` must be one of {field_spec.enum}.",
source=field_location,
contract=_contract_location(contract),
rule_id=field_spec.id,
)
)
if field_spec.pattern and isinstance(value, str) and not re.search(field_spec.pattern, value):
diagnostics.append(
Diagnostic(
severity="error",
code="contract.field.pattern",
message=f"Field `{field_spec.id}` does not match its required pattern.",
source=field_location,
contract=_contract_location(contract),
rule_id=field_spec.id,
)
)
if field_spec.min_length is not None and hasattr(value, "__len__") and len(value) < field_spec.min_length:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.field.min_length",
message=f"Field `{field_spec.id}` is shorter than {field_spec.min_length}.",
source=field_location,
contract=_contract_location(contract),
rule_id=field_spec.id,
)
)
if field_spec.max_length is not None and hasattr(value, "__len__") and len(value) > field_spec.max_length:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.field.max_length",
message=f"Field `{field_spec.id}` is longer than {field_spec.max_length}.",
source=field_location,
contract=_contract_location(contract),
rule_id=field_spec.id,
)
)
if field_spec.min is not None and isinstance(value, int | float) and value < field_spec.min:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.field.min",
message=f"Field `{field_spec.id}` is below {field_spec.min}.",
source=field_location,
contract=_contract_location(contract),
rule_id=field_spec.id,
)
)
if field_spec.max is not None and isinstance(value, int | float) and value > field_spec.max:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.field.max",
message=f"Field `{field_spec.id}` is above {field_spec.max}.",
source=field_location,
contract=_contract_location(contract),
rule_id=field_spec.id,
)
)
return diagnostics
def _check_document_metrics(
document: Document,
contract: DocumentContract,
metrics: DocumentMetrics,
) -> list[Diagnostic]:
return _check_bands(
contract.metrics,
metrics.to_dict()["document"],
source=SourceLocation(path=document.source_path, line=1),
contract=contract,
subject=f"document `{contract.document_type or contract.id}`",
)
def _check_sections(
document: Document,
contract: DocumentContract,
metrics: DocumentMetrics,
) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
section_metrics_by_index = {
index: section_metrics
for index, section_metrics in enumerate(metrics.section_metrics)
}
matches_by_id: dict[str, list[tuple[int, Section]]] = {}
for section_spec in contract.sections:
matches = _matching_sections(document.sections, section_spec)
if section_spec.id:
matches_by_id[section_spec.id] = matches
diagnostics.extend(_check_section_presence(document, contract, section_spec, matches))
if not matches or section_spec.presence in {"forbidden", "discouraged"}:
continue
if len(matches) > 1:
diagnostics.append(
Diagnostic(
severity="warning",
code="contract.section.duplicate",
message=f"Section `{section_spec.id}` appears {len(matches)} times.",
source=SourceLocation(path=document.source_path, line=matches[1][1].heading.line),
contract=_contract_location(contract),
rule_id=section_spec.id,
guidance="Keep one authoritative section or split it into distinct section roles.",
)
)
for index, section in matches:
diagnostics.extend(_check_section_level(document, contract, section_spec, section))
section_metrics = section_metrics_by_index[index]
diagnostics.extend(
_check_section_metrics(document, section, section_metrics, contract, section_spec)
)
section_text = "\n".join(block.text for block in section.blocks if block.text)
diagnostics.extend(
_check_assertions(section_text, section_spec.assertions, document, contract, section)
)
diagnostics.extend(_check_ordering(document, contract, matches_by_id))
return diagnostics
def _matching_sections(
sections: list[Section], section_spec: SectionSpec
) -> list[tuple[int, Section]]:
expected = {_normalize_heading(value) for value in section_spec.headings}
if not expected:
return []
return [
(index, section)
for index, section in enumerate(sections)
if _normalize_heading(section.heading.text) in expected
]
def _check_section_presence(
document: Document,
contract: DocumentContract,
section_spec: SectionSpec,
matches: list[tuple[int, Section]],
) -> list[Diagnostic]:
if matches and section_spec.presence == "forbidden":
return [
Diagnostic(
severity="error",
code="contract.section.forbidden",
message=f"Forbidden section `{section_spec.id}` is present.",
source=SourceLocation(path=document.source_path, line=matches[0][1].heading.line),
contract=_contract_location(contract),
rule_id=section_spec.id,
guidance=f"Remove the `{matches[0][1].heading.text}` section.",
)
]
if matches and section_spec.presence == "discouraged":
return [
Diagnostic(
severity="warning",
code="contract.section.discouraged",
message=f"Discouraged section `{section_spec.id}` is present.",
source=SourceLocation(path=document.source_path, line=matches[0][1].heading.line),
contract=_contract_location(contract),
rule_id=section_spec.id,
)
]
if not matches and section_spec.presence == "required":
return [
Diagnostic(
severity="error",
code="contract.section.missing",
message=f"Required section `{section_spec.id}` is missing.",
source=SourceLocation(path=document.source_path),
contract=_contract_location(contract),
rule_id=section_spec.id,
guidance=_section_guidance(section_spec),
)
]
if not matches and section_spec.presence == "recommended":
return [
Diagnostic(
severity="warning",
code="contract.section.recommended_missing",
message=f"Recommended section `{section_spec.id}` is missing.",
source=SourceLocation(path=document.source_path),
contract=_contract_location(contract),
rule_id=section_spec.id,
guidance=_section_guidance(section_spec),
)
]
return []
def _check_section_level(
document: Document,
contract: DocumentContract,
section_spec: SectionSpec,
section: Section,
) -> list[Diagnostic]:
if section_spec.level is None or section.heading.level == section_spec.level:
return []
return [
Diagnostic(
severity="error",
code="contract.section.level",
message=(
f"Section `{section_spec.id}` must use heading level "
f"{section_spec.level}, got {section.heading.level}."
),
source=SourceLocation(path=document.source_path, line=section.heading.line),
contract=_contract_location(contract),
rule_id=section_spec.id,
guidance=f"Change the heading to {'#' * section_spec.level} {section.heading.text}.",
)
]
def _check_section_metrics(
document: Document,
section: Section,
section_metrics: SectionMetrics,
contract: DocumentContract,
section_spec: SectionSpec,
) -> list[Diagnostic]:
return _check_bands(
section_spec.metrics,
section_metrics.to_dict(),
source=SourceLocation(path=document.source_path, line=section.heading.line),
contract=contract,
subject=f"section `{section.heading.text}`",
rule_id=section_spec.id,
)
def _check_ordering(
document: Document,
contract: DocumentContract,
matches_by_id: dict[str, list[tuple[int, Section]]],
) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
for section_spec in contract.sections:
if not section_spec.id or not matches_by_id.get(section_spec.id):
continue
index = matches_by_id[section_spec.id][0][0]
for target in section_spec.order_before:
target_match = matches_by_id.get(target)
if target_match and index > target_match[0][0]:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.section.order",
message=f"Section `{section_spec.id}` must appear before `{target}`.",
source=SourceLocation(
path=document.source_path,
line=matches_by_id[section_spec.id][0][1].heading.line,
),
contract=_contract_location(contract),
rule_id=section_spec.id,
)
)
for target in section_spec.order_after:
target_match = matches_by_id.get(target)
if target_match and index < target_match[0][0]:
diagnostics.append(
Diagnostic(
severity="error",
code="contract.section.order",
message=f"Section `{section_spec.id}` must appear after `{target}`.",
source=SourceLocation(
path=document.source_path,
line=matches_by_id[section_spec.id][0][1].heading.line,
),
contract=_contract_location(contract),
rule_id=section_spec.id,
)
)
return diagnostics
def _check_bands(
bands: list[MetricBand],
values: dict[str, Any],
*,
source: SourceLocation,
contract: DocumentContract,
subject: str,
rule_id: str | None = None,
) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
for band in bands:
metric = normalize_metric_name(band.metric)
if metric not in values:
continue
actual = values[metric]
if band.min is not None and actual < band.min:
diagnostics.append(
Diagnostic(
severity=band.severity_for("min"),
code="contract.metric.too_low",
message=(
f"{subject} has {actual} {metric}; expected at least {band.min}."
),
source=source,
contract=_contract_location(contract),
rule_id=band.rule_id or rule_id,
guidance=band.guidance,
details={"metric": metric, "actual": actual, "min": band.min},
)
)
if band.max is not None and actual > band.max:
diagnostics.append(
Diagnostic(
severity=band.severity_for("max"),
code="contract.metric.too_high",
message=f"{subject} has {actual} {metric}; expected at most {band.max}.",
source=source,
contract=_contract_location(contract),
rule_id=band.rule_id or rule_id,
guidance=band.guidance,
details={"metric": metric, "actual": actual, "max": band.max},
)
)
return diagnostics
def _check_assertions(
text: str,
assertions: list[AssertionSpec],
document: Document,
contract: DocumentContract,
section: Section | None = None,
) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
source_line = section.heading.line if section else 1
for assertion in assertions:
diagnostics.extend(
_check_assertion(
text,
assertion,
source=SourceLocation(path=document.source_path, line=source_line),
contract=contract,
)
)
return diagnostics
def _check_assertion(
text: str,
assertion: AssertionSpec,
*,
source: SourceLocation,
contract: DocumentContract,
) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
haystack = text if assertion.case_sensitive else text.lower()
for needle in assertion.contains:
expected = needle if assertion.case_sensitive else needle.lower()
if expected not in haystack:
diagnostics.append(
_assertion_diagnostic(
assertion,
"contract.assertion.contains_missing",
assertion.message or f"Expected content to contain `{needle}`.",
source,
contract,
{"expected": needle},
)
)
if assertion.contains_any:
if not any(
(needle if assertion.case_sensitive else needle.lower()) in haystack
for needle in assertion.contains_any
):
diagnostics.append(
_assertion_diagnostic(
assertion,
"contract.assertion.contains_any_missing",
assertion.message
or f"Expected content to contain one of {assertion.contains_any}.",
source,
contract,
{"expected_any": assertion.contains_any},
)
)
for needle in assertion.not_contains:
forbidden = needle if assertion.case_sensitive else needle.lower()
if forbidden in haystack:
diagnostics.append(
_assertion_diagnostic(
assertion,
"contract.assertion.forbidden_content",
assertion.message or f"Content must not contain `{needle}`.",
source,
contract,
{"forbidden": needle},
)
)
regex_flags = 0 if assertion.case_sensitive else re.IGNORECASE
for pattern in assertion.matches:
if not re.search(pattern, text, flags=regex_flags | re.MULTILINE):
diagnostics.append(
_assertion_diagnostic(
assertion,
"contract.assertion.pattern_missing",
assertion.message or f"Expected content to match `{pattern}`.",
source,
contract,
{"pattern": pattern},
)
)
for pattern in assertion.not_matches:
if re.search(pattern, text, flags=regex_flags | re.MULTILINE):
diagnostics.append(
_assertion_diagnostic(
assertion,
"contract.assertion.forbidden_pattern",
assertion.message or f"Content must not match `{pattern}`.",
source,
contract,
{"pattern": pattern},
)
)
return diagnostics
def _assertion_diagnostic(
assertion: AssertionSpec,
code: str,
message: str,
source: SourceLocation,
contract: DocumentContract,
details: dict[str, Any],
) -> Diagnostic:
return Diagnostic(
severity=assertion.severity,
code=code,
message=message,
source=source,
contract=_contract_location(contract),
rule_id=assertion.id,
guidance=assertion.guidance,
details=details,
)
def _section_guidance(section_spec: SectionSpec) -> str:
heading = section_spec.title or (section_spec.headings[0] if section_spec.headings else section_spec.id)
level = section_spec.level or 2
return f"Add a {'#' * level} {heading} section."
def _contract_location(contract: DocumentContract) -> SourceLocation:
return SourceLocation(path=contract.source_path, line=contract.source_line)
def _normalize_heading(text: str) -> str:
return re.sub(r"\s+", " ", text.strip().lower())
def _resolve_path(data: dict[str, Any], path: str) -> tuple[Any, bool]:
if not path:
return None, False
normalized = path.removeprefix("$.").removeprefix("document.")
current: Any = data
for part in normalized.split("."):
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None, False
return current, True
def _value_matches_type(value: Any, expected_type: str) -> bool:
if expected_type == "string":
return isinstance(value, str)
if expected_type == "number":
return isinstance(value, int | float) and not isinstance(value, bool)
if expected_type == "integer":
return isinstance(value, int) and not isinstance(value, bool)
if expected_type == "boolean":
return isinstance(value, bool)
if expected_type == "array":
return isinstance(value, list)
if expected_type == "object":
return isinstance(value, dict)
if expected_type == "date":
return isinstance(value, str)
return True

View File

@@ -0,0 +1,142 @@
"""Load document contracts from Markdown files."""
from __future__ import annotations
from copy import deepcopy
from pathlib import Path
from typing import Any
import yaml
from markitect_tool.contract.model import DocumentContract
from markitect_tool.core import parse_markdown
class ContractLoaderError(ValueError):
"""Raised when a contract file cannot be loaded."""
class ContractNotFoundError(ContractLoaderError):
"""Raised when no contract definition can be found in a Markdown file."""
class InvalidContractFormatError(ContractLoaderError):
"""Raised when the contract definition is not valid YAML."""
def load_contract_file(path: str | Path) -> DocumentContract:
"""Load a Markdown-native document contract file."""
file_path = Path(path)
text = file_path.read_text(encoding="utf-8")
return load_contract_text(text, source_path=str(file_path))
def load_contract_text(text: str, source_path: str | None = None) -> DocumentContract:
"""Load a document contract from Markdown text."""
document = parse_markdown(text, source_path=source_path)
frontmatter_contract = document.frontmatter.get("contract")
if frontmatter_contract is not None and not isinstance(frontmatter_contract, dict):
raise InvalidContractFormatError("Frontmatter `contract` must be a mapping")
block_data, block_line = _extract_contract_block(document.tokens, source_path)
merged = _merge_contracts(frontmatter_contract or {}, block_data or {})
metadata = {
key: value
for key, value in document.frontmatter.items()
if key != "contract"
}
if not merged and _looks_like_contract(metadata):
merged = deepcopy(metadata)
if not merged:
raise ContractNotFoundError(
"No contract definition found. Add a fenced ```yaml contract block."
)
return DocumentContract.from_mapping(
merged,
metadata=metadata,
source_path=source_path,
source_line=block_line,
)
def _extract_contract_block(
tokens: list[dict[str, Any]], source_path: str | None
) -> tuple[dict[str, Any] | None, int | None]:
yaml_candidates: list[tuple[dict[str, Any], int | None, bool]] = []
for token in tokens:
if token.get("type") != "fence":
continue
info = str(token.get("info", "")).strip().lower()
if not _is_yaml_info(info):
continue
line = _token_line(token)
raw_yaml = token.get("content", "")
try:
data = yaml.safe_load(raw_yaml) if raw_yaml.strip() else {}
except yaml.YAMLError as exc:
raise InvalidContractFormatError(
f"Invalid YAML contract block in {source_path or '<string>'}: {exc}"
) from exc
if data is None:
data = {}
if not isinstance(data, dict):
raise InvalidContractFormatError("Contract YAML block must be a mapping")
yaml_candidates.append((data, line, "contract" in info.split()))
for data, line, explicit in yaml_candidates:
if explicit:
return data, line
for data, line, _explicit in yaml_candidates:
if _looks_like_contract(data):
return data, line
return None, None
def _is_yaml_info(info: str) -> bool:
parts = info.split()
return "yaml" in parts or "yml" in parts
def _token_line(token: dict[str, Any]) -> int | None:
token_map = token.get("map")
if not token_map:
return None
return int(token_map[0]) + 1
def _looks_like_contract(data: dict[str, Any]) -> bool:
return any(
key in data
for key in {
"document",
"document_type",
"document-type",
"sections",
"fields",
"metrics",
"metric_bands",
"assertions",
"forms",
"rubrics",
}
)
def _merge_contracts(
frontmatter_contract: dict[str, Any], block_contract: dict[str, Any]
) -> dict[str, Any]:
merged = deepcopy(frontmatter_contract)
for key, value in block_contract.items():
if (
isinstance(value, dict)
and isinstance(merged.get(key), dict)
):
nested = deepcopy(merged[key])
nested.update(value)
merged[key] = nested
else:
merged[key] = value
return merged

View File

@@ -0,0 +1,127 @@
"""Metric extraction for parsed Markdown documents."""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Any
from markitect_tool.core import Document, Section
WORD_RE = re.compile(r"[A-Za-z0-9]+(?:[-'][A-Za-z0-9]+)*")
SENTENCE_RE = re.compile(r"[.!?]+(?:\s|$)")
LIST_ITEM_RE = re.compile(r"^\s*(?:[-+*]|\d+[.)])\s+", re.MULTILINE)
@dataclass(frozen=True)
class SectionMetrics:
"""Metrics for one heading-led section."""
heading: str
line: int
level: int
characters: int
words: int
sentences: int
paragraphs: int
sections: int = 1
headings: int = 1
list_items: int = 0
code_blocks: int = 0
nesting_depth: int = 1
def to_dict(self) -> dict[str, Any]:
return {
"heading": self.heading,
"line": self.line,
"level": self.level,
"characters": self.characters,
"words": self.words,
"sentences": self.sentences,
"paragraphs": self.paragraphs,
"sections": self.sections,
"headings": self.headings,
"list_items": self.list_items,
"code_blocks": self.code_blocks,
"nesting_depth": self.nesting_depth,
}
@dataclass(frozen=True)
class DocumentMetrics:
"""Metrics for a parsed Markdown document."""
characters: int
words: int
sentences: int
paragraphs: int
sections: int
headings: int
list_items: int
code_blocks: int
max_heading_depth: int
section_metrics: list[SectionMetrics] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"document": {
"characters": self.characters,
"words": self.words,
"sentences": self.sentences,
"paragraphs": self.paragraphs,
"sections": self.sections,
"headings": self.headings,
"list_items": self.list_items,
"code_blocks": self.code_blocks,
"max_heading_depth": self.max_heading_depth,
},
"sections": [section.to_dict() for section in self.section_metrics],
}
def collect_metrics(document: Document) -> DocumentMetrics:
"""Collect document-level and section-level metrics."""
section_metrics = [_section_metrics(section) for section in document.sections]
text = document.body.strip()
return DocumentMetrics(
characters=len(text),
words=count_words(text),
sentences=count_sentences(text),
paragraphs=sum(1 for block in document.blocks if block.type == "paragraph"),
sections=len(document.sections),
headings=len(document.headings),
list_items=count_list_items(text),
code_blocks=sum(1 for block in document.blocks if block.type == "code"),
max_heading_depth=max((heading.level for heading in document.headings), default=0),
section_metrics=section_metrics,
)
def count_words(text: str) -> int:
return len(WORD_RE.findall(text))
def count_sentences(text: str) -> int:
return len(SENTENCE_RE.findall(text))
def count_list_items(text: str) -> int:
return len(LIST_ITEM_RE.findall(text))
def _section_metrics(section: Section) -> SectionMetrics:
text = "\n".join(block.text for block in section.blocks if block.text).strip()
return SectionMetrics(
heading=section.heading.text,
line=section.heading.line,
level=section.heading.level,
characters=len(text),
words=count_words(text),
sentences=count_sentences(text),
paragraphs=sum(1 for block in section.blocks if block.type == "paragraph"),
list_items=count_list_items(text),
code_blocks=sum(1 for block in section.blocks if block.type == "code"),
nesting_depth=section.heading.level,
)

View File

@@ -0,0 +1,364 @@
"""Markdown-native document contract model."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
PRESENCE_VALUES = {"required", "recommended", "optional", "discouraged", "forbidden"}
FIELD_TYPES = {
"string",
"number",
"integer",
"boolean",
"array",
"object",
"date",
}
METRIC_ALIASES = {
"char": "characters",
"chars": "characters",
"character": "characters",
"characters": "characters",
"word": "words",
"words": "words",
"word_count": "words",
"sentence": "sentences",
"sentences": "sentences",
"paragraph": "paragraphs",
"paragraphs": "paragraphs",
"section": "sections",
"sections": "sections",
"heading": "headings",
"headings": "headings",
"list_item": "list_items",
"list_items": "list_items",
"code_block": "code_blocks",
"code_blocks": "code_blocks",
"max_heading_depth": "max_heading_depth",
"heading_depth": "max_heading_depth",
"nesting_depth": "nesting_depth",
}
METRIC_NAMES = set(METRIC_ALIASES.values())
@dataclass(frozen=True)
class MetricBand:
"""A soft or hard target for one metric."""
metric: str
min: float | None = None
max: float | None = None
severity: str = "warning"
min_severity: str | None = None
max_severity: str | None = None
rule_id: str | None = None
guidance: str | None = None
raw: Any = field(default_factory=dict)
@classmethod
def from_mapping(cls, metric: str, raw: Any) -> "MetricBand":
normalized = normalize_metric_name(metric)
if not isinstance(raw, dict):
return cls(metric=normalized, raw=raw)
return cls(
metric=normalized,
min=raw.get("min"),
max=raw.get("max"),
severity=str(raw.get("severity", "warning")),
min_severity=raw.get("min_severity"),
max_severity=raw.get("max_severity"),
rule_id=raw.get("id") or raw.get("rule_id"),
guidance=raw.get("guidance"),
raw=raw,
)
def severity_for(self, bound: str) -> str:
if bound == "min":
return self.min_severity or self.severity
if bound == "max":
return self.max_severity or self.severity
return self.severity
@dataclass(frozen=True)
class AssertionSpec:
"""A deterministic assertion over document or section text."""
id: str | None = None
message: str | None = None
severity: str = "error"
guidance: str | None = None
contains: list[str] = field(default_factory=list)
contains_any: list[str] = field(default_factory=list)
not_contains: list[str] = field(default_factory=list)
matches: list[str] = field(default_factory=list)
not_matches: list[str] = field(default_factory=list)
case_sensitive: bool = False
raw: Any = field(default_factory=dict)
@classmethod
def from_mapping(cls, raw: Any) -> "AssertionSpec":
if not isinstance(raw, dict):
return cls(raw=raw)
return cls(
id=raw.get("id") or raw.get("rule_id"),
message=raw.get("message"),
severity=str(raw.get("severity", "error")),
guidance=raw.get("guidance"),
contains=as_string_list(raw.get("contains")),
contains_any=as_string_list(raw.get("contains_any") or raw.get("contains_any_of")),
not_contains=as_string_list(raw.get("not_contains") or raw.get("forbid")),
matches=as_string_list(raw.get("matches") or raw.get("pattern")),
not_matches=as_string_list(raw.get("not_matches") or raw.get("forbid_pattern")),
case_sensitive=bool(raw.get("case_sensitive", False)),
raw=raw,
)
@dataclass(frozen=True)
class FieldSpec:
"""A structured value expected in frontmatter or external context."""
id: str | None
path: str | None = None
type: str | None = None
required: bool = False
label: str | None = None
description: str | None = None
enum: list[Any] | None = None
pattern: str | None = None
min: float | None = None
max: float | None = None
min_length: int | None = None
max_length: int | None = None
default: Any = None
source: str | None = None
raw: Any = field(default_factory=dict)
@classmethod
def from_mapping(cls, raw: Any, fallback_id: str | None = None) -> "FieldSpec":
if not isinstance(raw, dict):
return cls(id=fallback_id, raw=raw)
field_id = raw.get("id") or raw.get("name") or fallback_id
return cls(
id=field_id,
path=raw.get("path") or (f"frontmatter.{field_id}" if field_id else None),
type=raw.get("type"),
required=bool(raw.get("required", False)),
label=raw.get("label"),
description=raw.get("description"),
enum=raw.get("enum"),
pattern=raw.get("pattern"),
min=raw.get("min"),
max=raw.get("max"),
min_length=raw.get("min_length"),
max_length=raw.get("max_length"),
default=raw.get("default"),
source=raw.get("source"),
raw=raw,
)
@dataclass(frozen=True)
class SectionSpec:
"""Expected semantic role and constraints for a Markdown section."""
id: str | None
title: str | None = None
section_type: str | None = None
presence: str = "optional"
headings: list[str] = field(default_factory=list)
level: int | None = None
order_before: list[str] = field(default_factory=list)
order_after: list[str] = field(default_factory=list)
metrics: list[MetricBand] = field(default_factory=list)
assertions: list[AssertionSpec] = field(default_factory=list)
raw: Any = field(default_factory=dict)
@classmethod
def from_mapping(cls, raw: Any, fallback_id: str | None = None) -> "SectionSpec":
if not isinstance(raw, dict):
return cls(id=fallback_id, raw=raw)
section_id = raw.get("id") or fallback_id
match = raw.get("match") if isinstance(raw.get("match"), dict) else {}
headings = unique_strings(
as_string_list(raw.get("headings"))
+ as_string_list(raw.get("aliases"))
+ as_string_list(match.get("headings"))
+ as_string_list(match.get("aliases"))
+ as_string_list(raw.get("title"))
+ as_string_list(section_id)
)
order = raw.get("order") if isinstance(raw.get("order"), dict) else {}
return cls(
id=section_id,
title=raw.get("title"),
section_type=raw.get("section_type") or raw.get("type") or raw.get("role"),
presence=normalize_presence(raw),
headings=headings,
level=raw.get("level"),
order_before=as_string_list(raw.get("before") or order.get("before")),
order_after=as_string_list(raw.get("after") or order.get("after")),
metrics=metric_bands_from_mapping(raw.get("metrics")),
assertions=assertions_from_value(raw.get("assertions")),
raw=raw,
)
@dataclass(frozen=True)
class DocumentContract:
"""A contract for a typed Markdown document."""
id: str | None
document_type: str | None
title: str | None = None
version: str | None = None
description: str | None = None
sections: list[SectionSpec] = field(default_factory=list)
fields: list[FieldSpec] = field(default_factory=list)
metrics: list[MetricBand] = field(default_factory=list)
assertions: list[AssertionSpec] = field(default_factory=list)
forms: list[dict[str, Any]] = field(default_factory=list)
context: dict[str, Any] = field(default_factory=dict)
rubrics: list[dict[str, Any]] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
raw: dict[str, Any] = field(default_factory=dict)
source_path: str | None = None
source_line: int | None = None
@classmethod
def from_mapping(
cls,
raw: dict[str, Any],
*,
metadata: dict[str, Any] | None = None,
source_path: str | None = None,
source_line: int | None = None,
) -> "DocumentContract":
metadata = metadata or {}
document = raw.get("document") if isinstance(raw.get("document"), dict) else {}
return cls(
id=raw.get("id") or metadata.get("contract-id") or metadata.get("id"),
document_type=(
raw.get("document_type")
or raw.get("document-type")
or raw.get("type")
or document.get("type")
or metadata.get("document-type")
),
title=raw.get("title") or document.get("title") or metadata.get("title"),
version=str(raw.get("version") or metadata.get("version") or "")
or None,
description=raw.get("description") or document.get("description"),
sections=sections_from_value(raw.get("sections")),
fields=fields_from_value(raw.get("fields")),
metrics=metric_bands_from_mapping(
raw.get("metrics", {}).get("document")
if isinstance(raw.get("metrics"), dict)
and isinstance(raw.get("metrics", {}).get("document"), dict)
else raw.get("metrics") or raw.get("metric_bands")
),
assertions=assertions_from_value(raw.get("assertions")),
forms=raw.get("forms") if isinstance(raw.get("forms"), list) else [],
context=raw.get("context") if isinstance(raw.get("context"), dict) else {},
rubrics=raw.get("rubrics") if isinstance(raw.get("rubrics"), list) else [],
metadata=metadata,
raw=raw,
source_path=source_path,
source_line=source_line,
)
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"document_type": self.document_type,
"title": self.title,
"version": self.version,
"description": self.description,
"sections": [section.raw for section in self.sections],
"fields": [field.raw for field in self.fields],
"metrics": [band.raw for band in self.metrics],
"assertions": [assertion.raw for assertion in self.assertions],
"forms": self.forms,
"context": self.context,
"rubrics": self.rubrics,
"source_path": self.source_path,
}
def normalize_metric_name(metric: str) -> str:
return METRIC_ALIASES.get(str(metric).strip().lower(), str(metric).strip().lower())
def normalize_presence(raw: dict[str, Any]) -> str:
explicit = raw.get("presence")
if explicit:
return str(explicit)
if raw.get("forbidden") is True or raw.get("prohibited") is True:
return "forbidden"
if raw.get("discouraged") is True:
return "discouraged"
if raw.get("required") is True:
return "required"
if raw.get("recommended") is True:
return "recommended"
return "optional"
def sections_from_value(value: Any) -> list[SectionSpec]:
return [
SectionSpec.from_mapping(item, fallback_id=fallback_id)
for fallback_id, item in items_from_value(value)
]
def fields_from_value(value: Any) -> list[FieldSpec]:
return [
FieldSpec.from_mapping(item, fallback_id=fallback_id)
for fallback_id, item in items_from_value(value)
]
def assertions_from_value(value: Any) -> list[AssertionSpec]:
if value is None:
return []
values = value if isinstance(value, list) else [value]
return [AssertionSpec.from_mapping(item) for item in values]
def metric_bands_from_mapping(value: Any) -> list[MetricBand]:
if not isinstance(value, dict):
return [] if value is None else [MetricBand.from_mapping("<invalid>", value)]
return [MetricBand.from_mapping(metric, raw) for metric, raw in value.items()]
def items_from_value(value: Any) -> list[tuple[str | None, Any]]:
if value is None:
return []
if isinstance(value, dict):
return [(str(key), item) for key, item in value.items()]
if isinstance(value, list):
return [(None, item) for item in value]
return [(None, value)]
def as_string_list(value: Any) -> list[str]:
if value is None:
return []
if isinstance(value, list):
return [str(item) for item in value if item is not None]
return [str(value)]
def unique_strings(values: list[str]) -> list[str]:
seen: set[str] = set()
result: list[str] = []
for value in values:
normalized = value.strip()
if normalized and normalized.lower() not in seen:
seen.add(normalized.lower())
result.append(normalized)
return result

View File

@@ -29,7 +29,7 @@ def parse_markdown(markdown: str, source_path: str | None = None) -> Document:
frontmatter, body, body_line_offset = _split_frontmatter(markdown)
tokens = _parse_tokens(body)
blocks, headings = _blocks_and_headings(tokens, body_line_offset)
blocks, headings = _blocks_and_headings(tokens, body_line_offset, body)
sections = _sections_from_blocks(blocks, headings)
return Document(
source_path=source_path,
@@ -97,7 +97,7 @@ def _token_to_dict(token: Token) -> dict[str, Any]:
def _blocks_and_headings(
tokens: list[dict[str, Any]], line_offset: int
tokens: list[dict[str, Any]], line_offset: int, markdown: str
) -> tuple[list[ContentBlock], list[Heading]]:
blocks: list[ContentBlock] = []
headings: list[Heading] = []
@@ -126,6 +126,8 @@ def _blocks_and_headings(
if not text and token_type.endswith("_open"):
inline = _next_inline(tokens, index)
text = inline.get("content", "") if inline else ""
if not text:
text = _source_text(token, line_offset, markdown)
blocks.append(
ContentBlock(
type=_block_type(token_type),
@@ -151,6 +153,16 @@ def _line_range(token: dict[str, Any], line_offset: int) -> tuple[int | None, in
return line_map[0] + line_offset + 1, line_map[1] + line_offset
def _source_text(token: dict[str, Any], line_offset: int, markdown: str) -> str:
line_start, line_end = _line_range(token, line_offset)
if line_start is None or line_end is None:
return ""
lines = markdown.splitlines()
start_index = max(line_start - line_offset - 1, 0)
end_index = max(line_end - line_offset, start_index)
return "\n".join(lines[start_index:end_index]).strip()
def _block_type(token_type: str) -> str:
return {
"paragraph_open": "paragraph",

View File

@@ -0,0 +1,65 @@
"""Shared diagnostic primitives for Markitect validation layers."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
SEVERITIES = {"info", "warning", "error"}
@dataclass(frozen=True)
class SourceLocation:
"""A source location inside a document or contract."""
path: str | None = None
line: int | None = None
column: int | None = None
def to_dict(self) -> dict[str, Any]:
data = {
"path": self.path,
"line": self.line,
"column": self.column,
}
return {key: value for key, value in data.items() if value is not None}
@dataclass(frozen=True)
class Diagnostic:
"""A structured validation or assessment finding."""
severity: str
code: str
message: str
source: SourceLocation | None = None
contract: SourceLocation | None = None
rule_id: str | None = None
guidance: str | None = None
details: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
data: dict[str, Any] = {
"severity": self.severity,
"code": self.code,
"message": self.message,
"source": self.source.to_dict() if self.source else None,
"contract": self.contract.to_dict() if self.contract else None,
"rule_id": self.rule_id,
"guidance": self.guidance,
"details": self.details or None,
}
return {key: value for key, value in data.items() if value is not None}
def valid_severity(severity: str | None) -> bool:
"""Return whether a severity is supported by the diagnostic model."""
return severity in SEVERITIES
def has_error(diagnostics: list[Diagnostic]) -> bool:
"""Return whether the diagnostic list contains at least one error."""
return any(diagnostic.severity == "error" for diagnostic in diagnostics)

View File

@@ -9,6 +9,7 @@ from typing import Any
from jsonschema import Draft202012Validator, SchemaError, ValidationError
from markitect_tool.core import Document, parse_markdown_file
from markitect_tool.diagnostics import Diagnostic, SourceLocation
from markitect_tool.schema.loader import MarkdownSchema, load_schema_file
@@ -23,6 +24,21 @@ class ValidationViolation:
def to_dict(self) -> dict[str, str]:
return asdict(self)
def to_diagnostic(
self,
*,
source_path: str | None = None,
contract_path: str | None = None,
) -> Diagnostic:
return Diagnostic(
severity="error",
code="schema.validation",
message=self.message,
source=SourceLocation(path=source_path),
contract=SourceLocation(path=contract_path),
details={"path": self.path, "schema_path": self.schema_path},
)
@dataclass(frozen=True)
class SchemaValidationResult:
@@ -42,6 +58,17 @@ class SchemaValidationResult:
}
return {key: value for key, value in data.items() if value is not None}
def to_diagnostics(self) -> list[Diagnostic]:
"""Return schema violations as unified diagnostics."""
return [
violation.to_diagnostic(
source_path=self.document_path,
contract_path=self.schema_path,
)
for violation in self.violations
]
def validate_schema(schema: dict[str, Any]) -> SchemaValidationResult:
"""Validate that a JSON Schema itself is well formed."""