feat(spaces): implement Phase 0-1 of Information Space Service

Phase 0 - Project Organization:
- Create docs/PROJECT_STRUCTURE.md documenting codebase layout
- Create markitect/core/ with parser, serializer, document_manager, workspace
- Create markitect/schema/ consolidating 6 schema_*.py modules
- Create markitect/storage/ with database module
- Maintain backward compatibility via re-exports from original locations
- Add docs/roadmap/information-space-service/ with README and WORKPLAN

Phase 1 - Foundation (Weeks 1-3):
- Week 1: Core domain models (InformationSpace, SpaceDocument, SpaceConfig,
  SpaceMetadata, SpaceVariable, TransclusionReference, SpaceStatus)
- Week 2: Repository layer with interfaces (ISpaceRepository,
  IDocumentAssociationRepository, IVariableRepository, IReferenceRepository)
  and SQLite implementations with foreign key cascade deletes
- Week 3: SpaceService orchestration layer with full CRUD, document,
  variable, and reference tracking operations

Test coverage: 124 tests (25 model + 63 repository + 36 integration)

Capabilities delivered:
- CAP-001: InformationSpace entity with lifecycle management
- CAP-002: SpaceRepository CRUD with SQLite backing
- CAP-003: Document-Space associations with path-based organization
- CAP-004: Space metadata and configuration schemas
- CAP-005: Database schema with migrations

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-08 02:02:46 +01:00
parent 6ebcc0f60e
commit 9b12875681
45 changed files with 9818 additions and 4300 deletions

View File

@@ -0,0 +1,72 @@
"""
Schema management modules for MarkiTect.
This package contains the schema-related functionality:
- Validator: Validate markdown documents against JSON schemas
- Generator: Generate JSON schemas from markdown structures
- Loader: Load schemas from markdown files with embedded JSON
- Analyzer: Analyze schemas for rigidity issues
- Refiner: Refine rigid schemas with loosening rules
- Naming: Schema filename convention validation
All modules are re-exported from their original schema_*.py locations
for backward compatibility.
"""
from .validator import SchemaValidator
from .generator import SchemaGenerator
from .loader import (
MarkdownSchemaLoader,
SchemaLoaderError,
InvalidSchemaFormatError,
SchemaNotFoundError,
)
from .analyzer import (
SchemaAnalyzer,
SchemaAnalysisResult,
SchemaIssue,
IssueType,
IssueSeverity,
)
from .refiner import (
SchemaRefiner,
RefinementResult,
RefinementAction,
)
from .naming import (
validate_schema_filename,
suggest_valid_filename,
extract_schema_domain,
get_schema_version,
SchemaFilenameError,
SCHEMA_FILENAME_PATTERN,
)
__all__ = [
# Validator
"SchemaValidator",
# Generator
"SchemaGenerator",
# Loader
"MarkdownSchemaLoader",
"SchemaLoaderError",
"InvalidSchemaFormatError",
"SchemaNotFoundError",
# Analyzer
"SchemaAnalyzer",
"SchemaAnalysisResult",
"SchemaIssue",
"IssueType",
"IssueSeverity",
# Refiner
"SchemaRefiner",
"RefinementResult",
"RefinementAction",
# Naming
"validate_schema_filename",
"suggest_valid_filename",
"extract_schema_domain",
"get_schema_version",
"SchemaFilenameError",
"SCHEMA_FILENAME_PATTERN",
]

View File

@@ -0,0 +1,352 @@
"""
Schema Analyzer for Phase 2: Schema Refinement Tools
Analyzes JSON schemas to detect rigidity issues and provide suggestions
for improvement using the Phase 1 classification system.
"""
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
import json
from dataclasses import dataclass, field
from enum import Enum
class IssueType(Enum):
"""Types of schema rigidity issues."""
EXACT_COUNT = "exact_count"
MISSING_CLASSIFICATIONS = "missing_classifications"
MISSING_CONTENT_INSTRUCTIONS = "missing_content_instructions"
OVERLY_SPECIFIC = "overly_specific"
NO_FLEXIBILITY = "no_flexibility"
DEPRECATED_EXTENSIONS = "deprecated_extensions"
class IssueSeverity(Enum):
"""Severity levels for schema issues."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
@dataclass
class SchemaIssue:
"""Represents a detected schema issue."""
issue_type: IssueType
severity: IssueSeverity
path: str
message: str
suggestion: str
current_value: Any = None
suggested_value: Any = None
@dataclass
class SchemaAnalysisResult:
"""Results of schema analysis."""
is_rigid: bool
rigidity_score: int # 0-100, higher = more rigid
issues: List[SchemaIssue] = field(default_factory=list)
has_classifications: bool = False
has_content_control: bool = False
uses_deprecated_extensions: bool = False
@property
def issue_count_by_severity(self) -> Dict[IssueSeverity, int]:
"""Count issues by severity."""
counts = {severity: 0 for severity in IssueSeverity}
for issue in self.issues:
counts[issue.severity] += 1
return counts
class SchemaAnalyzer:
"""Analyzes schemas for rigidity and suggests improvements."""
def __init__(self):
"""Initialize the schema analyzer."""
self.deprecated_extensions = [
"x-markitect-required-sections",
"x-markitect-recommended-sections",
"x-markitect-optional-sections"
]
def analyze_schema(self, schema: Dict[str, Any]) -> SchemaAnalysisResult:
"""
Analyze a schema for rigidity issues.
Args:
schema: The JSON schema to analyze
Returns:
SchemaAnalysisResult with detected issues and suggestions
"""
result = SchemaAnalysisResult(is_rigid=False, rigidity_score=0)
# Check for Phase 1 features
result.has_classifications = "x-markitect-sections" in schema
result.has_content_control = "x-markitect-content-control" in schema
# Check for deprecated extensions
for deprecated in self.deprecated_extensions:
if deprecated in schema:
result.uses_deprecated_extensions = True
result.issues.append(SchemaIssue(
issue_type=IssueType.DEPRECATED_EXTENSIONS,
severity=IssueSeverity.WARNING,
path=deprecated,
message=f"Using deprecated extension '{deprecated}'",
suggestion=f"Migrate to 'x-markitect-sections' with classification system"
))
# Analyze properties for rigidity
if "properties" in schema:
self._analyze_properties(schema["properties"], result, "properties")
# Check for missing classifications
if not result.has_classifications:
result.issues.append(SchemaIssue(
issue_type=IssueType.MISSING_CLASSIFICATIONS,
severity=IssueSeverity.INFO,
path="root",
message="Schema does not use section classification system",
suggestion="Add 'x-markitect-sections' to classify sections as required/recommended/optional/discouraged/improper"
))
# Check for missing content control
if not result.has_content_control:
result.issues.append(SchemaIssue(
issue_type=IssueType.MISSING_CONTENT_INSTRUCTIONS,
severity=IssueSeverity.INFO,
path="root",
message="Schema does not provide content control",
suggestion="Add 'x-markitect-content-control' for pattern validation and quality metrics"
))
# Calculate rigidity score
result.rigidity_score = self._calculate_rigidity_score(result)
result.is_rigid = result.rigidity_score > 50
return result
def _analyze_properties(self, properties: Dict[str, Any], result: SchemaAnalysisResult, path: str):
"""Analyze schema properties for rigidity issues."""
for prop_name, prop_def in properties.items():
prop_path = f"{path}.{prop_name}"
if not isinstance(prop_def, dict):
continue
# Check for exact counts (const)
if "const" in prop_def:
result.issues.append(SchemaIssue(
issue_type=IssueType.EXACT_COUNT,
severity=IssueSeverity.WARNING,
path=prop_path,
message=f"Property '{prop_name}' requires exact value",
suggestion=f"Consider using a range or removing constraint for flexibility",
current_value=prop_def["const"]
))
# Check for arrays with exact counts
if prop_def.get("type") == "array":
min_items = prop_def.get("minItems")
max_items = prop_def.get("maxItems")
if min_items is not None and max_items is not None and min_items == max_items:
result.issues.append(SchemaIssue(
issue_type=IssueType.EXACT_COUNT,
severity=IssueSeverity.WARNING,
path=prop_path,
message=f"Array '{prop_name}' requires exactly {min_items} items",
suggestion=f"Use a range like minItems: {max(0, min_items - 2)}, maxItems: {min_items + 5}",
current_value={"minItems": min_items, "maxItems": max_items},
suggested_value={
"minItems": max(0, min_items - 2),
"maxItems": min_items + 5
}
))
# Check for overly specific counts (large numbers)
if min_items is not None and min_items > 50:
result.issues.append(SchemaIssue(
issue_type=IssueType.OVERLY_SPECIFIC,
severity=IssueSeverity.INFO,
path=prop_path,
message=f"Array '{prop_name}' has very specific minItems: {min_items}",
suggestion=f"Consider rounding to {(min_items // 10) * 10} for flexibility",
current_value=min_items,
suggested_value=(min_items // 10) * 10
))
# Check for overly specific integer constraints
if prop_def.get("type") == "integer":
if "minimum" in prop_def and "maximum" in prop_def:
min_val = prop_def["minimum"]
max_val = prop_def["maximum"]
range_size = max_val - min_val
if range_size < 3:
result.issues.append(SchemaIssue(
issue_type=IssueType.NO_FLEXIBILITY,
severity=IssueSeverity.INFO,
path=prop_path,
message=f"Integer '{prop_name}' has very narrow range: {min_val}-{max_val}",
suggestion=f"Consider widening range for flexibility",
current_value={"minimum": min_val, "maximum": max_val}
))
# Recursively check nested properties
if "properties" in prop_def:
self._analyze_properties(prop_def["properties"], result, prop_path)
# Check items schema for arrays
if "items" in prop_def and isinstance(prop_def["items"], dict):
if "properties" in prop_def["items"]:
self._analyze_properties(
prop_def["items"]["properties"],
result,
f"{prop_path}.items"
)
def _calculate_rigidity_score(self, result: SchemaAnalysisResult) -> int:
"""
Calculate overall rigidity score (0-100).
Higher score = more rigid schema.
"""
score = 0
# Count issues by type with weighted scores
weights = {
IssueType.EXACT_COUNT: 15,
IssueType.OVERLY_SPECIFIC: 10,
IssueType.NO_FLEXIBILITY: 8,
IssueType.MISSING_CLASSIFICATIONS: 5,
IssueType.MISSING_CONTENT_INSTRUCTIONS: 3,
IssueType.DEPRECATED_EXTENSIONS: 5
}
for issue in result.issues:
score += weights.get(issue.issue_type, 5)
# Cap at 100
return min(100, score)
def analyze_schema_file(self, schema_path: Path) -> SchemaAnalysisResult:
"""
Analyze a schema file.
Args:
schema_path: Path to JSON schema file
Returns:
SchemaAnalysisResult
"""
with open(schema_path) as f:
schema = json.load(f)
return self.analyze_schema(schema)
def format_analysis_report(self, result: SchemaAnalysisResult, verbose: bool = False) -> str:
"""
Format analysis results as a human-readable report.
Args:
result: Analysis results
verbose: Include detailed information
Returns:
Formatted report string
"""
lines = []
# Header
lines.append("=" * 70)
lines.append("Schema Analysis Report")
lines.append("=" * 70)
lines.append("")
# Overall assessment
rigidity_level = "HIGH" if result.rigidity_score > 70 else "MEDIUM" if result.rigidity_score > 40 else "LOW"
lines.append(f"Rigidity Score: {result.rigidity_score}/100 ({rigidity_level})")
lines.append(f"Status: {'RIGID - Needs refinement' if result.is_rigid else 'FLEXIBLE - Good'}")
lines.append("")
# Features check
lines.append("Phase 1 Features:")
lines.append(f" - Classifications: {'Yes' if result.has_classifications else 'No'}")
lines.append(f" - Content Control: {'Yes' if result.has_content_control else 'No'}")
if result.uses_deprecated_extensions:
lines.append(f" - Deprecated Extensions: Yes (needs migration)")
lines.append("")
# Issue summary
counts = result.issue_count_by_severity
lines.append(f"Issues Found: {len(result.issues)} total")
lines.append(f" - Errors: {counts[IssueSeverity.ERROR]}")
lines.append(f" - Warnings: {counts[IssueSeverity.WARNING]}")
lines.append(f" - Info: {counts[IssueSeverity.INFO]}")
lines.append("")
# List issues
if result.issues:
lines.append("Detected Issues:")
lines.append("-" * 70)
for i, issue in enumerate(result.issues, 1):
severity_icon = "ERROR" if issue.severity == IssueSeverity.ERROR else "WARN" if issue.severity == IssueSeverity.WARNING else "INFO"
lines.append(f"{i}. [{severity_icon}] {issue.message}")
lines.append(f" Path: {issue.path}")
lines.append(f" Suggestion: {issue.suggestion}")
if verbose and issue.current_value is not None:
lines.append(f" Current: {json.dumps(issue.current_value)}")
if verbose and issue.suggested_value is not None:
lines.append(f" Suggested: {json.dumps(issue.suggested_value)}")
lines.append("")
else:
lines.append("No issues found - schema is well-designed!")
lines.append("")
# Recommendations
if result.is_rigid:
lines.append("Recommendations:")
lines.append("-" * 70)
lines.append("Run: markitect schema-refine <schema-file> --loosen-counts")
lines.append(" to automatically apply suggested improvements")
lines.append("")
return "\n".join(lines)
def analyze_schema_cli(schema_path: str, verbose: bool = False) -> int:
"""
CLI entry point for schema analysis.
Args:
schema_path: Path to schema file
verbose: Show detailed information
Returns:
Exit code (0 = success, 1 = rigid schema found)
"""
analyzer = SchemaAnalyzer()
try:
result = analyzer.analyze_schema_file(Path(schema_path))
report = analyzer.format_analysis_report(result, verbose=verbose)
print(report)
return 1 if result.is_rigid else 0
except FileNotFoundError:
print(f"Error: Schema file not found: {schema_path}")
return 2
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in schema file: {e}")
return 2
except Exception as e:
print(f"Error: {e}")
return 2

View File

@@ -0,0 +1,466 @@
"""
Schema Generator for Issue #5: Generate a Schema from a Markdown File.
This module provides functionality to analyze markdown AST structures and generate
JSON schemas that describe the document's structural elements with configurable
depth limitations for architectural documentation analysis.
"""
import json
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Any, Optional, Set
from markitect.core.parser import parse_markdown_to_ast
from markitect.exceptions import FileNotFoundError, InvalidDepthError, InvalidInstructionTypeError
class SchemaGenerator:
"""
Generates JSON schemas from markdown file AST structures.
Analyzes the structural elements of markdown documents and creates
JSON schemas that can be used for validation and compliance checking
in architecture documentation workflows.
"""
def __init__(self):
"""Initialize the schema generator."""
self.default_schema_url = "http://json-schema.org/draft-07/schema#"
def generate_schema_from_file(
self,
file_path: Path,
max_depth: Optional[int] = None,
mode: Optional[str] = None,
outline_depth: Optional[int] = None,
capture_heading_text: bool = False,
include_content_instructions: bool = False,
instruction_type: str = 'description'
) -> Dict[str, Any]:
"""
Generate a JSON schema from a markdown file's AST structure.
Args:
file_path: Path to the markdown file
max_depth: Maximum heading depth to include (None = unlimited)
mode: Generation mode ('outline' for structure-focused schemas)
outline_depth: Depth limit for outline mode
capture_heading_text: Whether to capture exact heading text as constraints
include_content_instructions: Whether to include content instruction fields
instruction_type: Type of content instructions ('description', 'example', 'constraint', 'template')
Returns:
JSON schema as a dictionary
Raises:
FileNotFoundError: If the markdown file doesn't exist
InvalidDepthError: If max_depth is invalid (< 1)
"""
# Validate inputs
if not file_path.exists():
raise FileNotFoundError(f"Markdown file not found: {file_path}")
if max_depth is not None and max_depth < 1:
raise InvalidDepthError(f"max_depth must be >= 1, got: {max_depth}")
# Validate instruction type
valid_instruction_types = {'description', 'example', 'constraint', 'template'}
if instruction_type not in valid_instruction_types:
raise InvalidInstructionTypeError(f"Invalid instruction type '{instruction_type}'. Must be one of: {', '.join(valid_instruction_types)}")
# Read and parse the markdown file
content = file_path.read_text(encoding='utf-8')
ast_tokens = parse_markdown_to_ast(content)
# Analyze the AST structure
structure_analysis = self._analyze_ast_structure(ast_tokens, max_depth)
# Generate the JSON schema
schema = self._create_json_schema(
structure_analysis,
file_path.name,
mode=mode,
outline_depth=outline_depth,
capture_heading_text=capture_heading_text,
include_content_instructions=include_content_instructions,
instruction_type=instruction_type
)
return schema
def _analyze_ast_structure(self, tokens: List[Dict[str, Any]], max_depth: Optional[int]) -> Dict[str, Any]:
"""
Analyze AST tokens to extract structural patterns.
Args:
tokens: List of AST tokens from markdown-it
max_depth: Maximum heading depth to analyze
Returns:
Dictionary containing structural analysis
"""
analysis = {
'headings': defaultdict(list),
'paragraphs': [],
'lists': [],
'code_blocks': [],
'blockquotes': [],
'tables': [],
'links': [],
'images': [],
'emphasis': [],
'structure_types': set()
}
current_heading_level = 0
i = 0
while i < len(tokens):
token = tokens[i]
token_type = token.get('type', '')
# Track all structural types found
analysis['structure_types'].add(token_type)
# Analyze headings with depth filtering
if token_type == 'heading_open':
level = self._extract_heading_level(token.get('tag', ''))
if max_depth is None or level <= max_depth:
heading_content = self._extract_heading_content(tokens, i)
analysis['headings'][f'level_{level}'].append({
'content': heading_content,
'level': level,
'position': i
})
current_heading_level = level
# Analyze paragraphs
elif token_type == 'paragraph_open':
paragraph_content = self._extract_paragraph_content(tokens, i)
analysis['paragraphs'].append({
'content': paragraph_content,
'position': i,
'under_heading_level': current_heading_level
})
# Analyze lists
elif token_type in ['bullet_list_open', 'ordered_list_open']:
list_structure = self._extract_list_structure(tokens, i)
analysis['lists'].append({
'type': 'bullet' if token_type == 'bullet_list_open' else 'ordered',
'structure': list_structure,
'position': i,
'under_heading_level': current_heading_level
})
# Analyze code blocks
elif token_type == 'code_block' or token_type == 'fence':
code_info = self._extract_code_block_info(token)
analysis['code_blocks'].append({
'language': code_info.get('language', ''),
'content_length': len(code_info.get('content', '')),
'position': i,
'under_heading_level': current_heading_level
})
# Analyze blockquotes
elif token_type == 'blockquote_open':
quote_content = self._extract_blockquote_content(tokens, i)
analysis['blockquotes'].append({
'content': quote_content,
'position': i,
'under_heading_level': current_heading_level
})
# Analyze tables
elif token_type == 'table_open':
table_structure = self._extract_table_structure(tokens, i)
analysis['tables'].append({
'columns': table_structure.get('columns', 0),
'rows': table_structure.get('rows', 0),
'position': i,
'under_heading_level': current_heading_level
})
# Analyze inline elements
elif token_type == 'inline':
inline_analysis = self._analyze_inline_content(token)
analysis['links'].extend(inline_analysis.get('links', []))
analysis['images'].extend(inline_analysis.get('images', []))
analysis['emphasis'].extend(inline_analysis.get('emphasis', []))
i += 1
# Convert sets to lists for JSON serialization
analysis['structure_types'] = list(analysis['structure_types'])
return analysis
def _create_json_schema(
self,
analysis: Dict[str, Any],
filename: str,
mode: Optional[str] = None,
outline_depth: Optional[int] = None,
capture_heading_text: bool = False,
include_content_instructions: bool = False,
instruction_type: str = 'description'
) -> Dict[str, Any]:
"""
Create a JSON schema from structural analysis.
Args:
analysis: Structural analysis of the document
filename: Name of the source file
mode: Generation mode ('outline' for structure-focused schemas)
outline_depth: Depth limit for outline mode
capture_heading_text: Whether to capture exact heading text as constraints
include_content_instructions: Whether to include content instruction fields
instruction_type: Type of content instructions to generate
Returns:
JSON schema dictionary
"""
# Determine title format based on mode
title_preposition = "from" if mode == "outline" else "for"
schema = {
"$schema": self.default_schema_url,
"type": "object",
"title": f"Schema {title_preposition} {filename}",
"description": f"JSON schema describing the structure of {filename}",
"properties": {}
}
# Add metaschema extensions for outline mode
if mode == "outline":
schema["x-markitect-outline-mode"] = True
if outline_depth is not None:
schema["x-markitect-outline-depth"] = outline_depth
# Add metaschema extension for heading text capture
if capture_heading_text:
schema["x-markitect-heading-text-capture"] = True
# Add metaschema extension for content instructions
if include_content_instructions:
schema["x-markitect-content-instructions-enabled"] = True
# Add heading structure
if analysis['headings']:
heading_properties = {}
for level_key, headings in analysis['headings'].items():
if headings: # Only include levels that have content
# Configure content property based on heading text capture
if capture_heading_text:
# Extract actual heading texts in document order
heading_texts = [heading['content'] for heading in headings]
content_property = {"enum": heading_texts}
else:
content_property = {"type": "string"}
# Build properties for the heading item
item_properties = {
"content": content_property,
"level": {"type": "integer"},
"position": {"type": "integer"}
}
# Add content instruction fields if enabled
if include_content_instructions:
# Generate appropriate instruction text based on heading level
level_num = int(level_key.split('_')[1])
section_name = f"level {level_num} heading"
instruction_text = self._generate_content_instruction(section_name, instruction_type)
item_properties["x-markitect-content-instructions"] = {
"type": "string",
"const": instruction_text
}
item_properties["x-markitect-instruction-type"] = {
"type": "string",
"enum": [instruction_type]
}
heading_properties[level_key] = {
"type": "array",
"description": f"Headings at {level_key.replace('_', ' ')}",
"items": {
"type": "object",
"properties": item_properties,
"required": ["content", "level"]
},
"minItems": len(headings),
"maxItems": len(headings)
}
if heading_properties:
schema["properties"]["headings"] = {
"type": "object",
"description": "Document heading structure",
"properties": heading_properties
}
# Add other structural elements
structural_elements = {
"paragraphs": ("Text paragraphs", analysis['paragraphs']),
"lists": ("Lists (ordered and unordered)", analysis['lists']),
"code_blocks": ("Code blocks and fenced code", analysis['code_blocks']),
"blockquotes": ("Block quotations", analysis['blockquotes']),
"tables": ("Tables with rows and columns", analysis['tables']),
"links": ("Links to external resources", analysis['links']),
"images": ("Embedded images", analysis['images']),
"emphasis": ("Text emphasis (bold, italic)", analysis['emphasis'])
}
for element_name, (description, element_list) in structural_elements.items():
if element_list:
# Build base schema for the element
element_schema = {
"type": "array",
"description": description,
"minItems": len(element_list),
"maxItems": len(element_list)
}
# Add content instructions for paragraphs and lists if enabled
if include_content_instructions and element_name in ["paragraphs", "lists"]:
element_schema["items"] = {
"type": "object",
"properties": {
"content": {"type": "string"},
"x-markitect-content-instructions": {
"type": "string",
"const": self._generate_content_instruction(element_name, instruction_type)
},
"x-markitect-instruction-type": {
"type": "string",
"enum": [instruction_type]
}
}
}
schema["properties"][element_name] = element_schema
# Add metadata
schema["properties"]["metadata"] = {
"type": "object",
"description": "Document structure metadata",
"properties": {
"total_elements": {
"type": "integer",
"const": sum(len(v) if isinstance(v, list) else 0 for v in analysis.values())
},
"structure_types": {
"type": "array",
"items": {"type": "string"},
"description": "All structural element types found",
"const": analysis['structure_types']
}
}
}
return schema
def _extract_heading_level(self, tag: str) -> int:
"""Extract heading level from HTML tag (h1, h2, etc.)."""
if tag.startswith('h') and len(tag) == 2:
try:
return int(tag[1])
except ValueError:
pass
return 1
def _extract_heading_content(self, tokens: List[Dict[str, Any]], start_index: int) -> str:
"""Extract text content from heading tokens."""
# Look for the inline token that contains the heading text
for i in range(start_index, min(start_index + 3, len(tokens))):
token = tokens[i]
if token.get('type') == 'inline':
return token.get('content', '')
return ''
def _extract_paragraph_content(self, tokens: List[Dict[str, Any]], start_index: int) -> str:
"""Extract text content from paragraph tokens."""
# Look for the inline token that contains the paragraph text
for i in range(start_index, min(start_index + 3, len(tokens))):
token = tokens[i]
if token.get('type') == 'inline':
return token.get('content', '')
return ''
def _extract_list_structure(self, tokens: List[Dict[str, Any]], start_index: int) -> Dict[str, Any]:
"""Extract list structure information."""
# This is a simplified implementation
# In a full implementation, we'd parse the nested list structure
return {
"type": "list",
"estimated_items": 1 # Placeholder - would need more complex parsing
}
def _extract_code_block_info(self, token: Dict[str, Any]) -> Dict[str, Any]:
"""Extract code block information."""
return {
"language": token.get('info', '').split()[0] if token.get('info') else '',
"content": token.get('content', '')
}
def _extract_blockquote_content(self, tokens: List[Dict[str, Any]], start_index: int) -> str:
"""Extract blockquote content."""
# Simplified implementation
return "blockquote content"
def _extract_table_structure(self, tokens: List[Dict[str, Any]], start_index: int) -> Dict[str, Any]:
"""Extract table structure information."""
# Simplified implementation
return {
"columns": 2, # Placeholder
"rows": 1 # Placeholder
}
def _analyze_inline_content(self, token: Dict[str, Any]) -> Dict[str, List[Any]]:
"""Analyze inline content for links, images, emphasis."""
result = {
"links": [],
"images": [],
"emphasis": []
}
# Analyze children tokens if they exist
children = token.get('children', [])
for child in children:
if child and isinstance(child, dict):
child_type = child.get('type', '')
if child_type == 'link_open':
result['links'].append({"type": "link"})
elif child_type == 'image':
result['images'].append({"type": "image"})
elif child_type in ['em_open', 'strong_open']:
result['emphasis'].append({"type": child_type})
return result
def _generate_content_instruction(self, heading_text: str, instruction_type: str) -> str:
"""
Generate appropriate content instruction text based on heading and instruction type.
Args:
heading_text: The text of the heading
instruction_type: Type of instruction to generate
Returns:
Instruction text for the content field
"""
if instruction_type == "description":
return f"Provide content for the '{heading_text}' section"
elif instruction_type == "example":
return f"Example content for the '{heading_text}' section"
elif instruction_type == "constraint":
return f"Content must be relevant to '{heading_text}'"
elif instruction_type == "template":
return f"Template content for '{heading_text}' section"
else:
# Default fallback
return f"Content for the '{heading_text}' section"

610
markitect/schema/loader.py Normal file
View File

@@ -0,0 +1,610 @@
"""
Schema Loader - Extract JSON schemas from markdown files.
This module provides functionality to load schemas from markdown files that
contain embedded JSON schemas in code blocks, along with YAML frontmatter
metadata and rich documentation.
Markdown Schema Format:
---
schema-id: "https://markitect.dev/schemas/domain/v1"
version: "1.0.0"
status: "stable|draft|deprecated"
---
# Schema Title v1.0
## Documentation sections...
## Schema Definition
```json
{
"$schema": "http://json-schema.org/draft-07/schema#",
...
}
```
This enables:
- Rich documentation alongside schemas
- Version history in same file
- Human-readable schema files
- Markdown-first approach aligned with MarkiTect philosophy
"""
import re
import json
import yaml
from pathlib import Path
from typing import Dict, Any, Optional, List, Tuple
class SchemaLoaderError(Exception):
"""Base exception for schema loading errors."""
pass
class InvalidSchemaFormatError(SchemaLoaderError):
"""Schema file format is invalid."""
pass
class SchemaNotFoundError(SchemaLoaderError):
"""No JSON schema found in markdown file."""
pass
class MarkdownSchemaLoader:
"""
Load and parse markdown schema files.
Supports:
- YAML frontmatter for metadata
- JSON code blocks for schema definition
- Validation of schema structure
- Metadata merging
Example:
>>> loader = MarkdownSchemaLoader()
>>> schema_data = loader.load_schema(Path("manpage-schema-v1.0.md"))
>>> schema = schema_data['schema']
>>> metadata = schema_data['metadata']
"""
def __init__(self):
"""Initialize the schema loader with regex patterns."""
# Pattern to match YAML frontmatter
# Matches: --- ... --- at start of file
self.frontmatter_pattern = re.compile(
r'^---\s*\n(.*?)\n---\s*\n',
re.DOTALL | re.MULTILINE
)
# Pattern to match JSON code blocks
# Matches: ```json ... ```
self.json_code_block_pattern = re.compile(
r'```json\s*\n(.*?)\n```',
re.DOTALL | re.MULTILINE
)
# Pattern to find Schema Definition section
# This helps us find the right JSON block if there are multiple
self.schema_section_pattern = re.compile(
r'##\s+Schema Definition\s*\n',
re.MULTILINE
)
def load_schema(self, md_path: Path) -> Dict[str, Any]:
"""
Load schema from markdown file.
Args:
md_path: Path to markdown schema file
Returns:
Dictionary containing:
- schema: Extracted JSON schema (dict)
- metadata: Frontmatter metadata (dict)
- documentation: Full markdown content (str)
- source_file: Source file path (str)
Raises:
FileNotFoundError: If schema file doesn't exist
InvalidSchemaFormatError: If file format is invalid
SchemaNotFoundError: If no JSON schema found
Example:
>>> loader = MarkdownSchemaLoader()
>>> data = loader.load_schema(Path("manpage-schema-v1.0.md"))
>>> print(data['schema']['title'])
'Unix Manual Page Schema'
"""
if not md_path.exists():
raise FileNotFoundError(f"Schema file not found: {md_path}")
# Read file content
try:
content = md_path.read_text(encoding='utf-8')
except Exception as e:
raise InvalidSchemaFormatError(f"Failed to read schema file: {e}")
# Extract frontmatter
metadata = self._extract_frontmatter(content)
# Extract JSON schema
schema = self._extract_json_schema(content)
if not schema:
raise SchemaNotFoundError(
f"No JSON schema found in {md_path}. "
f"Expected a ```json code block with schema definition."
)
# Merge metadata into schema
schema = self._merge_metadata(schema, metadata, md_path)
return {
'schema': schema,
'metadata': metadata,
'documentation': content,
'source_file': str(md_path)
}
def _extract_frontmatter(self, content: str) -> Dict[str, Any]:
"""
Extract YAML frontmatter from markdown content.
Args:
content: Markdown file content
Returns:
Dictionary of frontmatter metadata (empty if none found)
Raises:
InvalidSchemaFormatError: If YAML is malformed
"""
match = self.frontmatter_pattern.search(content)
if not match:
return {}
yaml_content = match.group(1)
try:
metadata = yaml.safe_load(yaml_content) or {}
if not isinstance(metadata, dict):
raise InvalidSchemaFormatError(
f"Frontmatter must be a YAML dictionary, got {type(metadata)}"
)
return metadata
except yaml.YAMLError as e:
raise InvalidSchemaFormatError(f"Invalid YAML frontmatter: {e}")
def _extract_json_schema(self, content: str) -> Optional[Dict[str, Any]]:
"""
Extract JSON schema from markdown code blocks.
Prefers JSON blocks under "## Schema Definition" section,
but will use first JSON block if no Schema Definition section found.
Args:
content: Markdown file content
Returns:
JSON schema dictionary or None if not found
Raises:
InvalidSchemaFormatError: If JSON is malformed
"""
# Find all JSON code blocks
json_blocks = self.json_code_block_pattern.findall(content)
if not json_blocks:
return None
# Try to find the Schema Definition section
schema_section_match = self.schema_section_pattern.search(content)
if schema_section_match:
# Find JSON block that comes after Schema Definition section
section_pos = schema_section_match.end()
# Re-search for JSON blocks starting from section position
remaining_content = content[section_pos:]
section_json_blocks = self.json_code_block_pattern.findall(remaining_content)
if section_json_blocks:
json_text = section_json_blocks[0]
else:
# Fallback to first JSON block in entire document
json_text = json_blocks[0]
else:
# No Schema Definition section, use first JSON block
json_text = json_blocks[0]
# Parse JSON
try:
schema = json.loads(json_text)
if not isinstance(schema, dict):
raise InvalidSchemaFormatError(
f"Schema must be a JSON object, got {type(schema)}"
)
return schema
except json.JSONDecodeError as e:
raise InvalidSchemaFormatError(f"Invalid JSON schema: {e}")
def _merge_metadata(
self,
schema: Dict[str, Any],
metadata: Dict[str, Any],
source_file: Path
) -> Dict[str, Any]:
"""
Merge frontmatter metadata into schema.
Adds x-markitect-source extension with file info and metadata.
Optionally overrides schema fields with frontmatter values.
Args:
schema: JSON schema dictionary
metadata: Frontmatter metadata dictionary
source_file: Path to source file
Returns:
Schema with merged metadata
"""
# Create a copy to avoid modifying original
merged_schema = schema.copy()
# Add MarkiTect-specific source metadata
merged_schema['x-markitect-source'] = {
'file': str(source_file),
'filename': source_file.name,
'format': 'markdown',
'frontmatter': metadata
}
# Override schema fields with frontmatter if present
# This allows frontmatter to be the source of truth for metadata
if 'version' in metadata:
merged_schema['version'] = metadata['version']
if 'schema-id' in metadata:
merged_schema['$id'] = metadata['schema-id']
if 'status' in metadata:
if 'x-markitect-metadata' not in merged_schema:
merged_schema['x-markitect-metadata'] = {}
merged_schema['x-markitect-metadata']['status'] = metadata['status']
return merged_schema
def save_schema(
self,
schema: Dict[str, Any],
md_path: Path,
template: Optional[str] = None,
frontmatter: Optional[Dict[str, Any]] = None
):
"""
Save schema as markdown file.
Args:
schema: JSON schema dictionary to save
md_path: Output path for markdown file
template: Optional markdown template string
frontmatter: Optional frontmatter metadata (extracted from schema if not provided)
Raises:
InvalidSchemaFormatError: If schema is invalid
Example:
>>> loader = MarkdownSchemaLoader()
>>> loader.save_schema(
... schema={'title': 'My Schema', ...},
... md_path=Path('my-schema-v1.0.md')
... )
"""
if template:
# Use provided template
content = self._render_template(template, schema, frontmatter)
else:
# Generate basic markdown
content = self._generate_markdown(schema, frontmatter)
# Create parent directory if needed
md_path.parent.mkdir(parents=True, exist_ok=True)
# Write file
try:
md_path.write_text(content, encoding='utf-8')
except Exception as e:
raise InvalidSchemaFormatError(f"Failed to write schema file: {e}")
def _generate_markdown(
self,
schema: Dict[str, Any],
frontmatter: Optional[Dict[str, Any]] = None
) -> str:
"""
Generate markdown from schema.
Args:
schema: JSON schema dictionary
frontmatter: Optional frontmatter metadata
Returns:
Markdown content as string
"""
# Extract metadata from schema
title = schema.get('title', 'Untitled Schema')
version = schema.get('version', '1.0.0')
description = schema.get('description', '')
schema_id = schema.get('$id', '')
# Build frontmatter
if frontmatter is None:
frontmatter = {}
# Set defaults
if 'schema-id' not in frontmatter and schema_id:
frontmatter['schema-id'] = schema_id
if 'version' not in frontmatter:
frontmatter['version'] = version
if 'status' not in frontmatter:
frontmatter['status'] = 'draft'
# Generate frontmatter YAML
frontmatter_yaml = yaml.dump(
frontmatter,
default_flow_style=False,
allow_unicode=True
).strip()
# Generate JSON (pretty-printed)
schema_json = json.dumps(schema, indent=2, ensure_ascii=False)
# Build markdown content
md_content = f"""---
{frontmatter_yaml}
---
# {title} v{version}
## Overview
{description}
## Usage
```bash
markitect validate document.md --schema {Path(frontmatter.get('schema-id', 'schema')).name}
```
## Schema Definition
```json
{schema_json}
```
## Version History
### v{version}
- Initial version
"""
return md_content
def _render_template(
self,
template: str,
schema: Dict[str, Any],
frontmatter: Optional[Dict[str, Any]] = None
) -> str:
"""
Render markdown from template.
Simple template rendering using string formatting.
For complex templates, consider using Jinja2 or similar.
Args:
template: Template string
schema: JSON schema dictionary
frontmatter: Optional frontmatter metadata
Returns:
Rendered markdown content
"""
# Build context for template
context = {
'title': schema.get('title', 'Untitled'),
'version': schema.get('version', '1.0.0'),
'description': schema.get('description', ''),
'schema_id': schema.get('$id', ''),
'schema_json': json.dumps(schema, indent=2, ensure_ascii=False),
'frontmatter': frontmatter or {},
}
# Simple template rendering
try:
return template.format(**context)
except KeyError as e:
raise InvalidSchemaFormatError(f"Template missing key: {e}")
def list_json_blocks(self, content: str) -> List[Tuple[int, str]]:
"""
List all JSON code blocks in markdown content.
Useful for debugging or when multiple JSON blocks exist.
Args:
content: Markdown file content
Returns:
List of (position, json_content) tuples
Example:
>>> loader = MarkdownSchemaLoader()
>>> content = Path('schema.md').read_text()
>>> blocks = loader.list_json_blocks(content)
>>> print(f"Found {len(blocks)} JSON blocks")
"""
blocks = []
for match in self.json_code_block_pattern.finditer(content):
blocks.append((match.start(), match.group(1)))
return blocks
def validate_schema_structure(self, schema: Dict[str, Any]) -> List[str]:
"""
Validate basic schema structure.
Checks for required JSON Schema fields and MarkiTect conventions.
Args:
schema: JSON schema dictionary
Returns:
List of warning/error messages (empty if valid)
Example:
>>> loader = MarkdownSchemaLoader()
>>> issues = loader.validate_schema_structure(schema)
>>> if issues:
... print("Schema issues:", issues)
"""
issues = []
# Check required JSON Schema fields
if '$schema' not in schema:
issues.append("Missing required field: $schema")
if 'type' not in schema:
issues.append("Missing recommended field: type")
if 'title' not in schema:
issues.append("Missing recommended field: title")
if 'description' not in schema:
issues.append("Missing recommended field: description")
# Check MarkiTect conventions
if 'version' not in schema:
issues.append("Missing MarkiTect convention: version field")
if '$id' not in schema:
issues.append("Missing recommended field: $id")
# Check $id format if present
if '$id' in schema:
schema_id = schema['$id']
if not isinstance(schema_id, str):
issues.append("$id must be a string")
elif not schema_id.startswith('https://'):
issues.append("$id should be a full HTTPS URL")
return issues
def auto_ingest_schemas(db_manager=None, schema_dir: Optional[Path] = None, verbose: bool = False) -> Dict[str, Any]:
"""Automatically ingest schemas from markitect/schemas/ directory.
This function scans the schemas directory for .md schema files and ingests
any that are not already in the database. Useful for post-install setup
or automatic schema registration.
Args:
db_manager: DatabaseManager instance (optional, will create if not provided)
schema_dir: Directory containing schemas (defaults to markitect/schemas/)
verbose: If True, print detailed progress messages
Returns:
Dictionary with ingestion results:
{
'ingested': [list of schema names that were ingested],
'skipped': [list of schema names that were already present],
'failed': [list of (schema_name, error) tuples for failures]
}
Example:
>>> from markitect.schema.loader import auto_ingest_schemas
>>> results = auto_ingest_schemas(verbose=True)
>>> print(f"Ingested {len(results['ingested'])} schemas")
"""
# Determine schema directory
if schema_dir is None:
schema_dir = Path(__file__).parent.parent / "schemas"
if not schema_dir.exists():
if verbose:
print(f"Warning: Schema directory not found: {schema_dir}")
return {'ingested': [], 'skipped': [], 'failed': []}
# Initialize database manager if not provided
if db_manager is None:
from markitect.database import DatabaseManager
db_path = Path.home() / '.markitect' / 'markitect.db'
db_manager = DatabaseManager(str(db_path))
db_manager.initialize_database()
# Get list of already ingested schemas
try:
existing_schemas = {schema['name'] for schema in db_manager.list_schemas()}
except Exception as e:
if verbose:
print(f"Error listing existing schemas: {e}")
return {'ingested': [], 'skipped': [], 'failed': []}
results = {
'ingested': [],
'skipped': [],
'failed': []
}
# Find all schema files
schema_files = list(schema_dir.glob("*-schema-v*.md"))
if verbose and schema_files:
print(f"Found {len(schema_files)} schema file(s) in {schema_dir}")
loader = MarkdownSchemaLoader()
for schema_file in sorted(schema_files):
schema_name = schema_file.name
# Skip if already ingested
if schema_name in existing_schemas:
results['skipped'].append(schema_name)
if verbose:
print(f"Skipping {schema_name} (already ingested)")
continue
# Try to ingest
try:
# Load schema
schema_data_full = loader.load_schema(schema_file)
schema_data = schema_data_full['schema']
# Store in database
schema_content = json.dumps(schema_data, indent=2)
record_id = db_manager.store_schema_file(schema_name, schema_content)
if record_id:
results['ingested'].append(schema_name)
if verbose:
title = schema_data.get('title', schema_name)
print(f"Ingested {schema_name} (title: {title})")
else:
results['failed'].append((schema_name, "Failed to store in database"))
if verbose:
print(f"Failed to store {schema_name} in database")
except Exception as e:
results['failed'].append((schema_name, str(e)))
if verbose:
print(f"Failed to ingest {schema_name}: {e}")
if verbose:
print(f"\nAuto-ingestion complete:")
print(f" Ingested: {len(results['ingested'])}")
print(f" Skipped: {len(results['skipped'])}")
print(f" Failed: {len(results['failed'])}")
return results

369
markitect/schema/naming.py Normal file
View File

@@ -0,0 +1,369 @@
"""
Schema Naming Validation - Enforce filename conventions for schemas.
This module provides validation and utilities for schema filename conventions
to ensure consistency across the MarkiTect schema ecosystem.
Naming Convention:
Format: {domain}-schema-v{major}.{minor}.md
Components:
- domain: lowercase, hyphen-separated identifier (e.g., "manpage", "api-documentation")
- schema: literal string "schema"
- version: SemVer major.minor (e.g., "v1.0", "v2.1")
- extension: ".md" (markdown)
Valid Examples:
- manpage-schema-v1.0.md
- terminology-schema-v1.0.md
- api-documentation-schema-v1.0.md
- my-custom-type-schema-v2.1.md
Invalid Examples:
- manpage.json (missing version and wrong extension)
- manpage-v1.md (missing "schema" keyword)
- ManPage-Schema-v1.0.md (wrong case - must be lowercase)
- manpage-schema-1.0.md (missing 'v' prefix)
- manpage-schema-v1.md (missing minor version)
"""
import re
from pathlib import Path
from typing import Tuple, Optional, Dict, Any
# Regex pattern for schema filename validation
# Matches: {domain}-schema-v{major}.{minor}.md
# Where domain is lowercase letters/numbers/hyphens starting with letter
SCHEMA_FILENAME_PATTERN = re.compile(
r'^(?P<domain>[a-z][a-z0-9-]*)-schema-v(?P<major>\d+)\.(?P<minor>\d+)\.md$'
)
class SchemaFilenameError(Exception):
"""Exception raised for invalid schema filenames."""
pass
def validate_schema_filename(filename: str) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""
Validate schema filename against naming convention.
Args:
filename: The filename to validate (e.g., "manpage-schema-v1.0.md")
Returns:
Tuple of (is_valid, metadata_dict or None)
If valid, metadata_dict contains:
- domain: str - The domain identifier
- version: str - Full version string (e.g., "1.0")
- major: int - Major version number
- minor: int - Minor version number
- filename: str - The original filename
If invalid, metadata_dict is None
Examples:
>>> validate_schema_filename("manpage-schema-v1.0.md")
(True, {'domain': 'manpage', 'version': '1.0', ...})
>>> validate_schema_filename("invalid.json")
(False, None)
"""
match = SCHEMA_FILENAME_PATTERN.match(filename)
if not match:
return False, None
return True, {
'domain': match.group('domain'),
'version': f"{match.group('major')}.{match.group('minor')}",
'major': int(match.group('major')),
'minor': int(match.group('minor')),
'filename': filename
}
def suggest_valid_filename(
domain: str,
version: str = "1.0",
normalize: bool = True
) -> str:
"""
Generate a valid schema filename from domain and version.
Args:
domain: The schema domain (e.g., "manpage", "API Documentation")
version: Version string in format "major.minor" (default: "1.0")
normalize: Whether to normalize domain to lowercase/hyphenated
Returns:
Valid schema filename
Raises:
ValueError: If domain or version format is invalid
Examples:
>>> suggest_valid_filename("manpage", "1.0")
'manpage-schema-v1.0.md'
>>> suggest_valid_filename("API Documentation", "2.1")
'api-documentation-schema-v2.1.md'
>>> suggest_valid_filename("My_Custom_Type", "1.0")
'my-custom-type-schema-v1.0.md'
"""
if not domain:
raise ValueError("Domain cannot be empty")
if normalize:
# Normalize domain: lowercase, replace spaces/underscores with hyphens
domain_clean = domain.lower()
domain_clean = domain_clean.replace(' ', '-').replace('_', '-')
# Remove consecutive hyphens
domain_clean = re.sub(r'-+', '-', domain_clean)
# Remove leading/trailing hyphens
domain_clean = domain_clean.strip('-')
else:
domain_clean = domain
# Validate domain format (must start with letter, contain only lowercase, numbers, hyphens)
if not re.match(r'^[a-z][a-z0-9-]*$', domain_clean):
raise ValueError(
f"Invalid domain '{domain_clean}': must start with lowercase letter "
"and contain only lowercase letters, numbers, and hyphens"
)
# Parse and validate version
version_parts = version.split('.')
if len(version_parts) != 2:
raise ValueError(
f"Invalid version '{version}': must be in format 'major.minor' (e.g., '1.0')"
)
try:
major = int(version_parts[0])
minor = int(version_parts[1])
except ValueError:
raise ValueError(
f"Invalid version '{version}': major and minor must be integers"
)
if major < 0 or minor < 0:
raise ValueError(
f"Invalid version '{version}': major and minor must be non-negative"
)
return f"{domain_clean}-schema-v{major}.{minor}.md"
# Alias for backward compatibility
suggest_schema_filename = suggest_valid_filename
def extract_schema_domain(filename: str) -> str:
"""
Extract the domain from a valid schema filename.
Args:
filename: Schema filename to parse
Returns:
The domain identifier
Raises:
SchemaFilenameError: If filename is invalid
Examples:
>>> extract_schema_domain("manpage-schema-v1.0.md")
'manpage'
"""
is_valid, metadata = validate_schema_filename(filename)
if not is_valid:
raise SchemaFilenameError(
f"Invalid schema filename: {filename}\n"
f"Expected format: {{domain}}-schema-v{{major}}.{{minor}}.md"
)
return metadata['domain']
def get_schema_version(filename: str) -> str:
"""
Get the version string from a valid schema filename.
Args:
filename: Schema filename to parse
Returns:
Version string (e.g., "1.0")
Raises:
SchemaFilenameError: If filename is invalid
Examples:
>>> get_schema_version("manpage-schema-v1.0.md")
'1.0'
"""
is_valid, metadata = validate_schema_filename(filename)
if not is_valid:
raise SchemaFilenameError(
f"Invalid schema filename: {filename}\n"
f"Expected format: {{domain}}-schema-v{{major}}.{{minor}}.md"
)
return metadata['version']
def extract_schema_metadata(filename: str) -> Dict[str, Any]:
"""
Extract metadata from a valid schema filename.
Args:
filename: Schema filename to parse
Returns:
Dictionary with metadata
Raises:
SchemaFilenameError: If filename is invalid
Examples:
>>> extract_schema_metadata("manpage-schema-v1.0.md")
{'domain': 'manpage', 'version': '1.0', 'major': 1, 'minor': 0}
"""
is_valid, metadata = validate_schema_filename(filename)
if not is_valid:
raise SchemaFilenameError(
f"Invalid schema filename: {filename}\n"
f"Expected format: {{domain}}-schema-v{{major}}.{{minor}}.md"
)
return metadata
def get_validation_errors(filename: str) -> list:
"""
Get detailed validation errors for a filename.
Args:
filename: Filename to validate
Returns:
List of error messages (empty if valid)
Examples:
>>> get_validation_errors("manpage-schema-v1.0.md")
[]
>>> get_validation_errors("invalid.json")
['Filename does not match pattern: {domain}-schema-v{major}.{minor}.md', ...]
"""
errors = []
# Check basic pattern match
is_valid, _ = validate_schema_filename(filename)
if is_valid:
return errors
# Provide detailed feedback
errors.append(
f"Filename does not match pattern: {{domain}}-schema-v{{major}}.{{minor}}.md"
)
# Check extension
if not filename.endswith('.md'):
errors.append(f"Extension must be '.md', got: {Path(filename).suffix}")
# Check for version
if '-v' not in filename:
errors.append("Missing version: filename must include '-v{major}.{minor}'")
elif not re.search(r'-v\d+\.\d+', filename):
errors.append(
"Invalid version format: must be '-v{major}.{minor}' (e.g., '-v1.0')"
)
# Check for schema keyword
if '-schema-' not in filename:
errors.append("Missing '-schema-' keyword in filename")
# Check for uppercase (must be lowercase)
if any(c.isupper() for c in filename):
errors.append("Filename must be lowercase")
# Check domain format (if we can isolate it)
parts = filename.split('-schema-')
if len(parts) >= 1:
domain = parts[0]
if domain and not re.match(r'^[a-z][a-z0-9-]*$', domain):
errors.append(
f"Invalid domain '{domain}': must start with lowercase letter "
"and contain only lowercase letters, numbers, and hyphens"
)
return errors
def is_valid_schema_filename(filename: str) -> bool:
"""
Check if filename is valid (convenience function).
Args:
filename: Filename to check
Returns:
True if valid, False otherwise
Examples:
>>> is_valid_schema_filename("manpage-schema-v1.0.md")
True
>>> is_valid_schema_filename("invalid.json")
False
"""
is_valid, _ = validate_schema_filename(filename)
return is_valid
def format_validation_message(filename: str) -> str:
"""
Format a user-friendly validation message.
Args:
filename: Filename that failed validation
Returns:
Formatted error message with suggestions
Examples:
>>> print(format_validation_message("manpage.json"))
Invalid schema filename: manpage.json
...
"""
errors = get_validation_errors(filename)
if not errors:
return f"\u2705 Valid schema filename: {filename}"
message = f"\u274c Invalid schema filename: {filename}\n\n"
message += "Errors:\n"
for i, error in enumerate(errors, 1):
message += f" {i}. {error}\n"
message += "\nExpected format: {domain}-schema-v{major}.{minor}.md\n"
message += "Example: manpage-schema-v1.0.md\n"
# Try to suggest a corrected filename
try:
# Extract domain guess (everything before first hyphen or dot)
domain_guess = filename.split('-')[0].split('.')[0]
suggestion = suggest_valid_filename(domain_guess, "1.0")
message += f"\nSuggested filename: {suggestion}\n"
except Exception:
pass
return message

530
markitect/schema/refiner.py Normal file
View File

@@ -0,0 +1,530 @@
"""
Schema Refiner for Phase 2: Schema Refinement Tools
Automatically refines rigid schemas by applying loosening rules and fixes.
"""
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
import json
import copy
from dataclasses import dataclass, field
from .analyzer import SchemaAnalyzer, SchemaIssue, IssueType, IssueSeverity
@dataclass
class RefinementAction:
"""Represents a refinement action taken on the schema."""
issue_type: IssueType
path: str
description: str
old_value: Any = None
new_value: Any = None
@dataclass
class RefinementResult:
"""Results of schema refinement."""
success: bool
actions_taken: List[RefinementAction] = field(default_factory=list)
refined_schema: Optional[Dict[str, Any]] = None
error_message: Optional[str] = None
class SchemaRefiner:
"""Refines rigid schemas by applying loosening rules."""
def __init__(self):
"""Initialize the schema refiner."""
self.analyzer = SchemaAnalyzer()
def _navigate_to_path(self, schema: Dict[str, Any], path: str) -> Optional[Tuple[Dict[str, Any], str]]:
"""
Navigate to a path in the schema, handling nested 'properties' objects.
Returns (parent_object, property_name) or None if path doesn't exist.
"""
path_parts = path.split('.')
obj = schema
# Navigate through all but the last part
for i, part in enumerate(path_parts[:-1]):
# Try direct access first
if part in obj:
obj = obj[part]
# If not found and obj has 'properties', try there
elif isinstance(obj, dict) and "properties" in obj and part in obj["properties"]:
obj = obj["properties"][part]
else:
return None
# For the final part, check if we need to descend into 'properties'
prop_name = path_parts[-1]
if prop_name in obj:
return (obj, prop_name)
elif isinstance(obj, dict) and "properties" in obj and prop_name in obj["properties"]:
return (obj["properties"], prop_name)
else:
return None
def refine_schema_interactive(
self,
schema: Dict[str, Any],
loosen_counts: bool = True,
migrate_deprecated: bool = False,
round_numbers: bool = True
) -> RefinementResult:
"""
Refine a schema interactively, prompting for each fix.
Args:
schema: The JSON schema to refine
loosen_counts: Enable fixes for exact counts
migrate_deprecated: Enable migration of deprecated extensions
round_numbers: Enable rounding of overly specific numbers
Returns:
RefinementResult with actions taken and refined schema
"""
result = RefinementResult(success=False)
try:
# Analyze the schema first
analysis = self.analyzer.analyze_schema(schema)
print(f"\nFound {len(analysis.issues)} issue(s) to review\n")
# Deep copy to avoid modifying original
refined = copy.deepcopy(schema)
# Process each issue interactively
for i, issue in enumerate(analysis.issues, 1):
print(f"Issue {i}/{len(analysis.issues)}")
print(f" Type: {issue.issue_type.value}")
print(f" Path: {issue.path}")
print(f" {issue.message}")
print(f" Suggestion: {issue.suggestion}")
if issue.current_value is not None:
print(f" Current: {json.dumps(issue.current_value)}")
if issue.suggested_value is not None:
print(f" Suggested: {json.dumps(issue.suggested_value)}")
# Ask user if they want to apply the fix
response = input("\nApply this fix? [y/N/q]: ").strip().lower()
if response == 'q':
print("Refinement cancelled by user")
result.success = False
return result
elif response == 'y':
action = None
if loosen_counts and issue.issue_type == IssueType.EXACT_COUNT:
action = self._fix_exact_count(refined, issue)
elif round_numbers and issue.issue_type == IssueType.OVERLY_SPECIFIC:
action = self._fix_overly_specific(refined, issue)
elif loosen_counts and issue.issue_type == IssueType.NO_FLEXIBILITY:
action = self._fix_no_flexibility(refined, issue)
elif migrate_deprecated and issue.issue_type == IssueType.DEPRECATED_EXTENSIONS:
action = self._fix_deprecated_extension(refined, issue)
if action:
result.actions_taken.append(action)
print(f" ✓ Applied")
else:
print(f" ✗ Could not apply fix")
else:
print(f" - Skipped")
print()
result.refined_schema = refined
result.success = True
except Exception as e:
result.error_message = str(e)
return result
def refine_schema(
self,
schema: Dict[str, Any],
loosen_counts: bool = True,
migrate_deprecated: bool = False,
round_numbers: bool = True
) -> RefinementResult:
"""
Refine a schema by applying fixes for detected issues.
Args:
schema: The JSON schema to refine
loosen_counts: Apply fixes for exact counts
migrate_deprecated: Migrate deprecated extensions
round_numbers: Round overly specific numbers
Returns:
RefinementResult with actions taken and refined schema
"""
result = RefinementResult(success=False)
try:
# Analyze the schema first
analysis = self.analyzer.analyze_schema(schema)
# Deep copy to avoid modifying original
refined = copy.deepcopy(schema)
# Apply fixes based on issues found
for issue in analysis.issues:
action = None
if loosen_counts and issue.issue_type == IssueType.EXACT_COUNT:
action = self._fix_exact_count(refined, issue)
elif round_numbers and issue.issue_type == IssueType.OVERLY_SPECIFIC:
action = self._fix_overly_specific(refined, issue)
elif loosen_counts and issue.issue_type == IssueType.NO_FLEXIBILITY:
action = self._fix_no_flexibility(refined, issue)
elif migrate_deprecated and issue.issue_type == IssueType.DEPRECATED_EXTENSIONS:
action = self._fix_deprecated_extension(refined, issue)
if action:
result.actions_taken.append(action)
result.refined_schema = refined
result.success = True
except Exception as e:
result.error_message = str(e)
return result
def _fix_exact_count(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
"""Fix exact count constraints by converting to ranges."""
nav_result = self._navigate_to_path(schema, issue.path)
if not nav_result:
return None
obj, prop_name = nav_result
prop_def = obj[prop_name]
old_value = copy.deepcopy(prop_def)
# Check if it's an array with exact minItems/maxItems
if isinstance(prop_def, dict) and prop_def.get("type") == "array":
min_items = prop_def.get("minItems")
max_items = prop_def.get("maxItems")
if min_items is not None and max_items is not None and min_items == max_items:
# Apply suggested loosening
new_min = max(0, min_items - 2)
new_max = min_items + 5
prop_def["minItems"] = new_min
prop_def["maxItems"] = new_max
return RefinementAction(
issue_type=IssueType.EXACT_COUNT,
path=issue.path,
description=f"Loosened array count from exactly {min_items} to range {new_min}-{new_max}",
old_value={"minItems": min_items, "maxItems": max_items},
new_value={"minItems": new_min, "maxItems": new_max}
)
# Check if it's a const value
if isinstance(prop_def, dict) and "const" in prop_def:
const_value = prop_def["const"]
del prop_def["const"]
# If it's a number, convert to a range
if isinstance(const_value, int):
prop_def["minimum"] = const_value - 1
prop_def["maximum"] = const_value + 1
return RefinementAction(
issue_type=IssueType.EXACT_COUNT,
path=issue.path,
description=f"Converted const {const_value} to range {const_value-1}-{const_value+1}",
old_value=const_value,
new_value={"minimum": const_value - 1, "maximum": const_value + 1}
)
else:
# For non-numeric constants, just remove the constraint
return RefinementAction(
issue_type=IssueType.EXACT_COUNT,
path=issue.path,
description=f"Removed const constraint: {const_value}",
old_value=const_value,
new_value=None
)
return None
def _fix_overly_specific(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
"""Fix overly specific number constraints by rounding."""
if issue.suggested_value is None:
return None
nav_result = self._navigate_to_path(schema, issue.path)
if not nav_result:
return None
obj, prop_name = nav_result
prop_def = obj[prop_name]
# Round the minItems value
if isinstance(prop_def, dict) and "minItems" in prop_def:
old_value = prop_def["minItems"]
new_value = issue.suggested_value
prop_def["minItems"] = new_value
return RefinementAction(
issue_type=IssueType.OVERLY_SPECIFIC,
path=issue.path,
description=f"Rounded minItems from {old_value} to {new_value}",
old_value=old_value,
new_value=new_value
)
return None
def _fix_no_flexibility(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
"""Fix narrow ranges by widening them."""
nav_result = self._navigate_to_path(schema, issue.path)
if not nav_result:
return None
obj, prop_name = nav_result
prop_def = obj[prop_name]
if isinstance(prop_def, dict) and "minimum" in prop_def and "maximum" in prop_def:
old_min = prop_def["minimum"]
old_max = prop_def["maximum"]
range_size = old_max - old_min
# Widen the range
new_min = old_min - 5
new_max = old_max + 5
prop_def["minimum"] = new_min
prop_def["maximum"] = new_max
return RefinementAction(
issue_type=IssueType.NO_FLEXIBILITY,
path=issue.path,
description=f"Widened range from {old_min}-{old_max} to {new_min}-{new_max}",
old_value={"minimum": old_min, "maximum": old_max},
new_value={"minimum": new_min, "maximum": new_max}
)
return None
def _fix_deprecated_extension(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
"""Remove deprecated extension (migration requires manual work)."""
# For now, just document that manual migration is needed
# Full migration would require understanding the old format
deprecated_key = issue.path
if deprecated_key in schema:
old_value = schema[deprecated_key]
# Don't actually remove it automatically - too risky
return RefinementAction(
issue_type=IssueType.DEPRECATED_EXTENSIONS,
path=issue.path,
description=f"Detected deprecated extension (manual migration recommended)",
old_value=old_value,
new_value=None
)
return None
def refine_schema_file(
self,
input_path: Path,
output_path: Optional[Path] = None,
loosen_counts: bool = True,
migrate_deprecated: bool = False,
round_numbers: bool = True
) -> RefinementResult:
"""
Refine a schema file.
Args:
input_path: Path to input schema file
output_path: Path to output file (if None, overwrites input)
loosen_counts: Apply fixes for exact counts
migrate_deprecated: Migrate deprecated extensions
round_numbers: Round overly specific numbers
Returns:
RefinementResult
"""
with open(input_path) as f:
schema = json.load(f)
result = self.refine_schema(
schema,
loosen_counts=loosen_counts,
migrate_deprecated=migrate_deprecated,
round_numbers=round_numbers
)
if result.success and result.refined_schema:
output = output_path or input_path
with open(output, 'w') as f:
json.dump(result.refined_schema, f, indent=2)
return result
def format_refinement_report(self, result: RefinementResult) -> str:
"""
Format refinement results as a human-readable report.
Args:
result: Refinement results
Returns:
Formatted report string
"""
lines = []
# Header
lines.append("=" * 70)
lines.append("Schema Refinement Report")
lines.append("=" * 70)
lines.append("")
if not result.success:
lines.append(f"❌ Refinement failed: {result.error_message}")
return "\n".join(lines)
# Summary
action_count = len(result.actions_taken)
if action_count == 0:
lines.append("✅ No refinements needed - schema is already flexible")
else:
lines.append(f"✅ Applied {action_count} refinement(s)")
lines.append("")
# List actions
if result.actions_taken:
lines.append("Actions Taken:")
lines.append("-" * 70)
for i, action in enumerate(result.actions_taken, 1):
lines.append(f"{i}. {action.description}")
lines.append(f" Path: {action.path}")
if action.old_value is not None:
lines.append(f" Before: {json.dumps(action.old_value)}")
if action.new_value is not None:
lines.append(f" After: {json.dumps(action.new_value)}")
lines.append("")
return "\n".join(lines)
def refine_schema_cli(
schema_path: str,
output: Optional[str] = None,
loosen_counts: bool = True,
migrate_deprecated: bool = False,
round_numbers: bool = True,
dry_run: bool = False,
interactive: bool = False
) -> int:
"""
CLI entry point for schema refinement.
Args:
schema_path: Path to schema file
output: Output path (None = overwrite input)
loosen_counts: Apply count loosening fixes
migrate_deprecated: Migrate deprecated extensions
round_numbers: Round overly specific numbers
dry_run: Show changes without applying
interactive: Prompt for each fix
Returns:
Exit code (0 = success, 1 = no changes needed, 2 = error)
"""
refiner = SchemaRefiner()
try:
input_path = Path(schema_path)
output_path = Path(output) if output else None
# Load schema
with open(input_path) as f:
schema = json.load(f)
if interactive:
# Interactive mode - prompt for each fix
print(f"Refining schema: {schema_path}")
result = refiner.refine_schema_interactive(
schema,
loosen_counts=loosen_counts,
migrate_deprecated=migrate_deprecated,
round_numbers=round_numbers
)
if result.success and result.refined_schema and not dry_run:
# Write the refined schema
output = output_path or input_path
with open(output, 'w') as f:
json.dump(result.refined_schema, f, indent=2)
print(f"\nRefined schema written to: {output}")
elif dry_run:
# Just analyze and show what would be done
result = refiner.refine_schema(
schema,
loosen_counts=loosen_counts,
migrate_deprecated=migrate_deprecated,
round_numbers=round_numbers
)
print("DRY RUN - No changes will be made")
print()
else:
result = refiner.refine_schema_file(
input_path,
output_path,
loosen_counts=loosen_counts,
migrate_deprecated=migrate_deprecated,
round_numbers=round_numbers
)
# Only print full report if not in interactive mode (user already saw changes)
if not interactive:
report = refiner.format_refinement_report(result)
print(report)
elif result.success:
# Just print summary for interactive mode
print(f"\n{'='*70}")
print(f"Refinement complete: {len(result.actions_taken)} change(s) applied")
print(f"{'='*70}")
if result.success and len(result.actions_taken) > 0:
return 0 # Success with changes
elif result.success:
return 1 # Success but no changes needed
else:
return 2 # Error
except FileNotFoundError:
print(f"Error: Schema file not found: {schema_path}")
return 2
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in schema file: {e}")
return 2
except Exception as e:
print(f"Error: {e}")
return 2

View File

@@ -0,0 +1,679 @@
"""
Schema Validator for Issue #7: Validate a Markdown File Against a Schema.
This module provides functionality to validate markdown documents against JSON schemas
for arc42 architecture documentation compliance checking - essential for intelligent
document analysis and plan-actual comparison capabilities.
"""
import json
from pathlib import Path
from typing import Dict, Any
try:
import jsonschema
from jsonschema import SchemaError
JSONSCHEMA_AVAILABLE = True
except ImportError:
# Fallback to basic validation without full JSON Schema validation
JSONSCHEMA_AVAILABLE = False
SchemaError = Exception
from markitect.core.parser import parse_markdown_to_ast
from .generator import SchemaGenerator
from markitect.validation_error import ValidationErrorCollector, ValidationErrorType
from markitect.exceptions import FileNotFoundError, SchemaValidationError, InvalidSchemaError
class SchemaValidator:
"""
Validates markdown documents against JSON schemas for arc42 compliance checking.
This service provides boolean validation results for markdown documents against
schemas, enabling strict compliance checking for architectural documentation
templates and intelligent plan-actual comparison.
"""
def __init__(self):
"""Initialize the schema validator."""
self.schema_generator = SchemaGenerator()
self.jsonschema_available = JSONSCHEMA_AVAILABLE
def validate_file_against_schema(self, file_path: Path, schema: Dict[str, Any]) -> bool:
"""
Validate a markdown file against a JSON schema.
Args:
file_path: Path to the markdown file
schema: JSON schema dictionary to validate against
Returns:
True if the document matches the schema, False otherwise
Raises:
FileNotFoundError: If the markdown file doesn't exist
InvalidSchemaError: If the schema is invalid
"""
# Validate inputs
if not file_path.exists():
raise FileNotFoundError(f"Markdown file not found: {file_path}")
# Validate the schema itself
self._validate_schema(schema)
# Generate the document's current structure
try:
document_schema = self.schema_generator.generate_schema_from_file(file_path)
except Exception as e:
raise SchemaValidationError(f"Failed to generate document schema: {e}") from e
# Check if the expected schema has heading text constraints
if self._has_heading_text_constraints(schema):
# For heading text validation, we need to extract actual content and compare against enum constraints
return self._validate_with_heading_text_constraints(file_path, schema, document_schema)
# Use standard structure comparison for backward compatibility
return self._compare_structures(document_schema, schema)
def validate_file_against_schema_string(self, file_path: Path, schema_json: str) -> bool:
"""
Validate a markdown file against a JSON schema provided as a string.
Args:
file_path: Path to the markdown file
schema_json: JSON schema as a string
Returns:
True if the document matches the schema, False otherwise
Raises:
FileNotFoundError: If the markdown file doesn't exist
InvalidSchemaError: If the schema is invalid JSON or schema
"""
try:
schema = json.loads(schema_json)
except json.JSONDecodeError as e:
raise InvalidSchemaError(f"Invalid JSON schema string: {e}") from e
return self.validate_file_against_schema(file_path, schema)
def validate_file_against_schema_file(self, file_path: Path, schema_file_path: Path) -> bool:
"""
Validate a markdown file against a schema stored in a file.
Args:
file_path: Path to the markdown file
schema_file_path: Path to the JSON schema file
Returns:
True if the document matches the schema, False otherwise
Raises:
FileNotFoundError: If either file doesn't exist
InvalidSchemaError: If the schema file is invalid
"""
if not schema_file_path.exists():
raise FileNotFoundError(f"Schema file not found: {schema_file_path}")
try:
schema_content = schema_file_path.read_text(encoding='utf-8')
schema = json.loads(schema_content)
except (IOError, json.JSONDecodeError) as e:
raise InvalidSchemaError(f"Failed to load schema file {schema_file_path}: {e}") from e
return self.validate_file_against_schema(file_path, schema)
def _validate_schema(self, schema: Dict[str, Any]) -> None:
"""
Validate that a schema is a valid JSON Schema.
Args:
schema: Schema dictionary to validate
Raises:
InvalidSchemaError: If the schema is invalid
"""
try:
# Check basic schema structure
if not isinstance(schema, dict):
raise InvalidSchemaError("Schema must be a dictionary")
# Basic schema validation
if not schema.get('$schema') or not schema.get('type'):
raise InvalidSchemaError("Schema must have '$schema' and 'type' fields")
# If jsonschema library is available, use it for full validation
if self.jsonschema_available:
jsonschema.validators.validator_for(schema).check_schema(schema)
except (SchemaError, TypeError, AttributeError) as e:
raise InvalidSchemaError(f"Invalid JSON schema: {e}") from e
def _compare_structures(self, document_schema: Dict[str, Any], expected_schema: Dict[str, Any]) -> bool:
"""
Compare a document's actual structure against expected schema requirements.
This method performs the core validation logic by analyzing whether the
document's generated schema satisfies the requirements defined in the
expected schema.
Args:
document_schema: Schema generated from the actual document
expected_schema: Expected schema requirements
Returns:
True if the document satisfies the expected schema requirements
"""
try:
# Extract actual document structure
doc_properties = document_schema.get('properties', {})
expected_properties = expected_schema.get('properties', {})
# Check all required properties are present
required_properties = expected_schema.get('required', [])
for prop in required_properties:
if prop not in doc_properties:
return False
# Validate heading structure if specified
if 'headings' in expected_properties and 'headings' in doc_properties:
if not self._validate_heading_structure(
doc_properties['headings'],
expected_properties['headings']
):
return False
# Validate other structural elements
structural_elements = ['paragraphs', 'lists', 'code_blocks', 'blockquotes', 'tables']
for element in structural_elements:
if element in expected_properties:
if not self._validate_structural_element(
doc_properties.get(element),
expected_properties[element]
):
return False
return True
except Exception:
# If comparison fails for any reason, consider validation failed
return False
def _validate_heading_structure(self, actual_headings: Dict[str, Any], expected_headings: Dict[str, Any]) -> bool:
"""
Validate heading structure against expected requirements.
Args:
actual_headings: Actual heading structure from document
expected_headings: Expected heading requirements
Returns:
True if heading structure meets requirements
"""
actual_heading_props = actual_headings.get('properties', {})
expected_heading_props = expected_headings.get('properties', {})
required_heading_levels = expected_headings.get('required', [])
# Check required heading levels are present
for level in required_heading_levels:
if level not in actual_heading_props:
return False
# Check each expected heading level meets requirements
for level, expected_spec in expected_heading_props.items():
if level not in actual_heading_props:
# If level is not required, skip it
if level not in required_heading_levels:
continue
return False
actual_spec = actual_heading_props[level]
# Check minimum and maximum item requirements
if not self._validate_array_constraints(actual_spec, expected_spec):
return False
return True
def _validate_structural_element(self, actual_element: Dict[str, Any], expected_element: Dict[str, Any]) -> bool:
"""
Validate a structural element (paragraphs, lists, etc.) against requirements.
Args:
actual_element: Actual element structure from document
expected_element: Expected element requirements
Returns:
True if element meets requirements
"""
if actual_element is None:
# Element doesn't exist in document
return False
return self._validate_array_constraints(actual_element, expected_element)
def _validate_array_constraints(self, actual: Dict[str, Any], expected: Dict[str, Any]) -> bool:
"""
Validate array constraints (minItems, maxItems) for structural elements.
Args:
actual: Actual element specification
expected: Expected element specification
Returns:
True if constraints are satisfied
"""
# Get actual count from the schema specification
# For generated schemas, we use minItems/maxItems which represent actual counts
actual_min = actual.get('minItems', 0)
actual_max = actual.get('maxItems', actual_min)
actual_count = actual_max # In our generated schemas, min=max=actual count
# Check against expected constraints
expected_min = expected.get('minItems', 0)
expected_max = expected.get('maxItems', float('inf'))
return expected_min <= actual_count <= expected_max
# Issue #8: Detailed Error Reporting Methods
def validate_file_with_errors(self, file_path: Path, schema: Dict[str, Any]) -> ValidationErrorCollector:
"""
Validate a markdown file against a JSON schema and collect detailed errors.
This method provides comprehensive error reporting for Issue #8, enabling
users to understand exactly how their documents deviate from schemas.
Args:
file_path: Path to the markdown file
schema: JSON schema dictionary to validate against
Returns:
ValidationErrorCollector with all validation errors
Raises:
FileNotFoundError: If the markdown file doesn't exist
InvalidSchemaError: If the schema is invalid
"""
# Validate inputs
if not file_path.exists():
raise FileNotFoundError(f"Markdown file not found: {file_path}")
# Validate the schema itself
self._validate_schema(schema)
# Initialize error collector
error_collector = ValidationErrorCollector()
# Generate the document's current structure
try:
document_schema = self.schema_generator.generate_schema_from_file(file_path)
except Exception as e:
error_collector.add_error(
ValidationErrorType.STRUCTURAL_VIOLATION,
f"Failed to generate document schema: {e}",
"document.structure",
suggestion="Check if the markdown file is properly formatted"
)
return error_collector
# Compare the document's structure against the expected schema and collect errors
if self._has_heading_text_constraints(schema):
# For heading text validation, we need to handle enum constraints specially
self._compare_structures_with_errors(document_schema, schema, error_collector)
self._validate_heading_text_constraints_with_errors(file_path, schema, error_collector)
else:
# Use standard structure comparison for backward compatibility
self._compare_structures_with_errors(document_schema, schema, error_collector)
return error_collector
def validate_file_with_errors_string(self, file_path: Path, schema_json: str) -> ValidationErrorCollector:
"""
Validate a markdown file against a JSON schema string and collect detailed errors.
Args:
file_path: Path to the markdown file
schema_json: JSON schema as a string
Returns:
ValidationErrorCollector with all validation errors
Raises:
FileNotFoundError: If the markdown file doesn't exist
InvalidSchemaError: If the schema is invalid JSON or schema
"""
try:
schema = json.loads(schema_json)
except json.JSONDecodeError as e:
raise InvalidSchemaError(f"Invalid JSON schema string: {e}") from e
return self.validate_file_with_errors(file_path, schema)
def validate_file_with_errors_file(self, file_path: Path, schema_file_path: Path) -> ValidationErrorCollector:
"""
Validate a markdown file against a schema file and collect detailed errors.
Args:
file_path: Path to the markdown file
schema_file_path: Path to the JSON schema file
Returns:
ValidationErrorCollector with all validation errors
Raises:
FileNotFoundError: If either file doesn't exist
InvalidSchemaError: If the schema file is invalid
"""
if not schema_file_path.exists():
raise FileNotFoundError(f"Schema file not found: {schema_file_path}")
try:
schema_content = schema_file_path.read_text(encoding='utf-8')
schema = json.loads(schema_content)
except (IOError, json.JSONDecodeError) as e:
raise InvalidSchemaError(f"Failed to load schema file {schema_file_path}: {e}") from e
return self.validate_file_with_errors(file_path, schema)
def _compare_structures_with_errors(
self,
document_schema: Dict[str, Any],
expected_schema: Dict[str, Any],
error_collector: ValidationErrorCollector
) -> None:
"""
Compare document structure against expected schema and collect detailed errors.
This method performs comprehensive validation analysis, collecting specific
errors about missing headings, incorrect content counts, and structural violations.
Args:
document_schema: Schema generated from the actual document
expected_schema: Expected schema requirements
error_collector: Collector to accumulate validation errors
"""
try:
# Extract actual document structure
doc_properties = document_schema.get('properties', {})
expected_properties = expected_schema.get('properties', {})
# Check all required properties are present
required_properties = expected_schema.get('required', [])
for prop in required_properties:
if prop not in doc_properties:
error_collector.add_error(
ValidationErrorType.MISSING_REQUIRED_SECTION,
f"Missing required section: '{prop}'",
f"document.{prop}",
expected=f"Section '{prop}' is required by schema",
actual="Section not found",
suggestion=f"Add the '{prop}' section to your document"
)
# Validate heading structure if specified
if 'headings' in expected_properties and 'headings' in doc_properties:
self._validate_heading_structure_with_errors(
doc_properties['headings'],
expected_properties['headings'],
error_collector
)
# Validate other structural elements
structural_elements = ['paragraphs', 'lists', 'code_blocks', 'blockquotes', 'tables']
for element in structural_elements:
if element in expected_properties:
self._validate_structural_element_with_errors(
doc_properties.get(element),
expected_properties[element],
element,
error_collector
)
except Exception as e:
error_collector.add_error(
ValidationErrorType.STRUCTURAL_VIOLATION,
f"Error during structure comparison: {e}",
"document.structure",
suggestion="Check if both the document and schema are properly formatted"
)
def _validate_heading_structure_with_errors(
self,
actual_headings: Dict[str, Any],
expected_headings: Dict[str, Any],
error_collector: ValidationErrorCollector
) -> None:
"""
Validate heading structure and collect detailed errors.
Args:
actual_headings: Actual heading structure from document
expected_headings: Expected heading requirements
error_collector: Collector for validation errors
"""
actual_heading_props = actual_headings.get('properties', {})
expected_heading_props = expected_headings.get('properties', {})
required_heading_levels = expected_headings.get('required', [])
# Check required heading levels are present
for level in required_heading_levels:
if level not in actual_heading_props:
level_num = level.replace('level_', '')
error_collector.add_error(
ValidationErrorType.MISSING_REQUIRED_HEADING,
f"Missing required heading level {level_num}",
f"headings.{level}",
expected=f"At least one heading at level {level_num}",
actual="No headings found at this level",
suggestion=f"Add heading(s) at level {level_num} (e.g., {'#' * int(level_num)} Heading)"
)
# Check each expected heading level meets requirements
for level, expected_spec in expected_heading_props.items():
if level not in actual_heading_props:
# If level is not required, skip it
if level not in required_heading_levels:
continue
# Already handled above in required check
else:
actual_spec = actual_heading_props[level]
level_num = level.replace('level_', '')
# Check minimum and maximum item requirements
self._validate_array_constraints_with_errors(
actual_spec,
expected_spec,
f"headings.{level}",
f"level {level_num} headings",
error_collector
)
def _validate_structural_element_with_errors(
self,
actual_element: Dict[str, Any],
expected_element: Dict[str, Any],
element_name: str,
error_collector: ValidationErrorCollector
) -> None:
"""
Validate a structural element and collect errors.
Args:
actual_element: Actual element structure from document
expected_element: Expected element requirements
element_name: Name of the structural element (for error messages)
error_collector: Collector for validation errors
"""
if actual_element is None:
error_collector.add_error(
ValidationErrorType.MISSING_REQUIRED_SECTION,
f"Missing required structural element: {element_name}",
f"content.{element_name}",
expected=f"Document should contain {element_name}",
actual="Element not found",
suggestion=f"Add {element_name} to your document"
)
return
self._validate_array_constraints_with_errors(
actual_element,
expected_element,
f"content.{element_name}",
element_name,
error_collector
)
def _validate_array_constraints_with_errors(
self,
actual: Dict[str, Any],
expected: Dict[str, Any],
path: str,
element_description: str,
error_collector: ValidationErrorCollector
) -> None:
"""
Validate array constraints and collect specific errors.
Args:
actual: Actual element specification
expected: Expected element specification
path: JSON path for error location
element_description: Human-readable element description
error_collector: Collector for validation errors
"""
# Get actual count from the schema specification
actual_min = actual.get('minItems', 0)
actual_max = actual.get('maxItems', actual_min)
actual_count = actual_max # In our generated schemas, min=max=actual count
# Check against expected constraints
expected_min = expected.get('minItems', 0)
expected_max = expected.get('maxItems', float('inf'))
# Check minimum constraint
if actual_count < expected_min:
error_collector.add_error(
ValidationErrorType.INSUFFICIENT_CONTENT,
f"Insufficient {element_description}: found {actual_count}, required at least {expected_min}",
path,
expected=f"At least {expected_min} {element_description}",
actual=f"{actual_count} {element_description}",
suggestion=f"Add {expected_min - actual_count} more {element_description}"
)
# Check maximum constraint
if expected_max != float('inf') and actual_count > expected_max:
error_collector.add_error(
ValidationErrorType.EXCESS_CONTENT,
f"Too many {element_description}: found {actual_count}, maximum allowed {expected_max}",
path,
expected=f"At most {expected_max} {element_description}",
actual=f"{actual_count} {element_description}",
suggestion=f"Remove {actual_count - expected_max} {element_description}"
)
def _has_heading_text_constraints(self, schema: Dict[str, Any]) -> bool:
"""
Check if the schema has heading text constraints (enum values on heading content).
Args:
schema: JSON schema to check
Returns:
True if schema has heading text constraints
"""
headings_props = schema.get('properties', {}).get('headings', {}).get('properties', {})
for level_props in headings_props.values():
items = level_props.get('items', {})
content_prop = items.get('properties', {}).get('content', {})
if 'enum' in content_prop:
return True
return False
def _validate_with_heading_text_constraints(
self,
file_path: Path,
expected_schema: Dict[str, Any],
document_schema: Dict[str, Any]
) -> bool:
"""
Validate document with heading text constraints by comparing actual content against enum values.
Args:
file_path: Path to the markdown file
expected_schema: Schema with heading text constraints
document_schema: Generated schema from the actual document
Returns:
True if document meets all constraints including heading text
"""
# First check standard structure compliance
if not self._compare_structures(document_schema, expected_schema):
return False
# Then check heading text constraints
expected_headings = expected_schema.get('properties', {}).get('headings', {}).get('properties', {})
# Generate document analysis with actual heading content
content = file_path.read_text(encoding='utf-8')
ast_tokens = parse_markdown_to_ast(content)
structure_analysis = self.schema_generator._analyze_ast_structure(ast_tokens, None)
for level_key, expected_level_spec in expected_headings.items():
content_constraints = expected_level_spec.get('items', {}).get('properties', {}).get('content', {})
if 'enum' in content_constraints:
allowed_texts = content_constraints['enum']
actual_headings = structure_analysis['headings'].get(level_key, [])
for heading in actual_headings:
actual_text = heading['content']
if actual_text not in allowed_texts:
return False
return True
def _validate_heading_text_constraints_with_errors(
self,
file_path: Path,
expected_schema: Dict[str, Any],
error_collector: ValidationErrorCollector
) -> None:
"""
Validate heading text constraints and collect detailed errors.
Args:
file_path: Path to the markdown file
expected_schema: Schema with heading text constraints
error_collector: Collector for validation errors
"""
expected_headings = expected_schema.get('properties', {}).get('headings', {}).get('properties', {})
# Generate document analysis with actual heading content
content = file_path.read_text(encoding='utf-8')
ast_tokens = parse_markdown_to_ast(content)
structure_analysis = self.schema_generator._analyze_ast_structure(ast_tokens, None)
for level_key, expected_level_spec in expected_headings.items():
content_constraints = expected_level_spec.get('items', {}).get('properties', {}).get('content', {})
if 'enum' in content_constraints:
allowed_texts = content_constraints['enum']
actual_headings = structure_analysis['headings'].get(level_key, [])
for i, heading in enumerate(actual_headings):
actual_text = heading['content']
if actual_text not in allowed_texts:
# Add detailed error about heading text mismatch
error_collector.add_error(
ValidationErrorType.HEADING_COUNT_MISMATCH,
f"Heading text mismatch at {level_key.replace('_', ' ')} #{i+1}: expected one of {allowed_texts}, found '{actual_text}'",
f"headings.{level_key}[{i}].content",
expected=f"One of: {allowed_texts}",
actual=actual_text,
suggestion=f"Change heading text to one of the allowed values: {', '.join(allowed_texts)}"
)