Files
markitect-main/markitect/schema/generator.py
tegwick 60f33443ae
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
feat(schema): add semantic schema generation as default mode
schema-generate now builds content-aware schemas from the document's
section hierarchy instead of counting markdown syntax elements. Detects
key-value tables, data tables, link lists, and mixed content patterns
to produce schemas that reflect the actual document outline.

Old behavior preserved via --mode syntactic. Validator and visualization
tools pinned to syntactic mode for compatibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 18:49:50 +01:00

820 lines
32 KiB
Python

"""
Schema Generator for Issue #5: Generate a Schema from a Markdown File.
This module provides functionality to analyze markdown AST structures and generate
JSON schemas that describe the document's structural elements with configurable
depth limitations for architectural documentation analysis.
Supports two generation modes:
- semantic (default): Builds a content-aware schema from the document's section
hierarchy, detecting key-value tables, lists, and mixed content patterns.
- syntactic: Counts markdown elements by type (legacy behavior).
"""
import json
import re
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Any, Optional, Set
from markitect.core.parser import parse_markdown_to_ast
from markitect.exceptions import FileNotFoundError, InvalidDepthError, InvalidInstructionTypeError
class SchemaGenerator:
"""
Generates JSON schemas from markdown file AST structures.
Analyzes the structural elements of markdown documents and creates
JSON schemas that can be used for validation and compliance checking
in architecture documentation workflows.
"""
def __init__(self):
"""Initialize the schema generator."""
self.default_schema_url = "http://json-schema.org/draft-07/schema#"
def generate_schema_from_file(
self,
file_path: Path,
max_depth: Optional[int] = None,
mode: Optional[str] = None,
outline_depth: Optional[int] = None,
capture_heading_text: bool = False,
include_content_instructions: bool = False,
instruction_type: str = 'description'
) -> Dict[str, Any]:
"""
Generate a JSON schema from a markdown file's AST structure.
Args:
file_path: Path to the markdown file
max_depth: Maximum heading depth to include (None = unlimited)
mode: Generation mode: None/'semantic' for content-aware schemas,
'syntactic' for element counting, 'outline' for legacy outline mode
outline_depth: Depth limit for outline mode
capture_heading_text: Whether to capture exact heading text as constraints
include_content_instructions: Whether to include content instruction fields
instruction_type: Type of content instructions ('description', 'example', 'constraint', 'template')
Returns:
JSON schema as a dictionary
Raises:
FileNotFoundError: If the markdown file doesn't exist
InvalidDepthError: If max_depth is invalid (< 1)
"""
# Validate inputs
if not file_path.exists():
raise FileNotFoundError(f"Markdown file not found: {file_path}")
if max_depth is not None and max_depth < 1:
raise InvalidDepthError(f"max_depth must be >= 1, got: {max_depth}")
# Validate instruction type
valid_instruction_types = {'description', 'example', 'constraint', 'template'}
if instruction_type not in valid_instruction_types:
raise InvalidInstructionTypeError(f"Invalid instruction type '{instruction_type}'. Must be one of: {', '.join(valid_instruction_types)}")
# Read and parse the markdown file
content = file_path.read_text(encoding='utf-8')
ast_tokens = parse_markdown_to_ast(content)
# Auto-select syntactic mode when syntactic-only options are used
effective_mode = mode
if effective_mode is None and (capture_heading_text or include_content_instructions):
effective_mode = 'syntactic'
# Dispatch based on mode
if effective_mode in ('syntactic', 'outline'):
# Legacy: syntactic element-counting schema
structure_analysis = self._analyze_ast_structure(ast_tokens, max_depth)
schema = self._create_json_schema(
structure_analysis,
file_path.name,
mode=effective_mode,
outline_depth=outline_depth,
capture_heading_text=capture_heading_text,
include_content_instructions=include_content_instructions,
instruction_type=instruction_type
)
else:
# Default: semantic content-aware schema
schema = self._create_semantic_schema(ast_tokens, file_path.name, max_depth)
return schema
# =========================================================================
# Semantic schema generation (default mode)
# =========================================================================
@staticmethod
def _slugify(text: str) -> str:
"""Convert heading or label text to a valid JSON property key."""
replacements = {
'ä': 'ae', 'ö': 'oe', 'ü': 'ue',
'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue', 'ß': 'ss',
}
slug = text
for char, repl in replacements.items():
slug = slug.replace(char, repl)
slug = slug.lower()
slug = re.sub(r'[^a-z0-9]+', '_', slug)
slug = slug.strip('_')
return slug or 'feld'
def _build_section_tree(
self, tokens: List[Dict[str, Any]], max_depth: Optional[int] = None
) -> Dict[str, Any]:
"""
Build a hierarchical section tree from flat markdown-it token list.
Returns a root node with children. Each node has:
- heading: str (None for root)
- level: int (0 for root)
- slug: str
- content_tokens: list of non-heading tokens belonging to this section
- children: list of sub-sections
"""
root = {
'heading': None, 'level': 0, 'slug': '',
'content_tokens': [], 'children': []
}
stack = [root]
i = 0
while i < len(tokens):
token = tokens[i]
if token.get('type') == 'heading_open':
level = self._extract_heading_level(token.get('tag', ''))
heading_text = self._extract_heading_content(tokens, i)
if max_depth is not None and level > max_depth:
# Skip this heading and its close token, but keep content
i += 1
while i < len(tokens) and tokens[i].get('type') != 'heading_close':
i += 1
i += 1
continue
section = {
'heading': heading_text,
'level': level,
'slug': self._slugify(heading_text),
'content_tokens': [],
'children': []
}
# Pop stack until we find the parent (level < current)
while len(stack) > 1 and stack[-1]['level'] >= level:
stack.pop()
stack[-1]['children'].append(section)
stack.append(section)
# Skip past heading_close
i += 1
while i < len(tokens) and tokens[i].get('type') != 'heading_close':
i += 1
else:
# Add content token to current section
stack[-1]['content_tokens'].append(token)
i += 1
return root
def _extract_table_data(
self, tokens: List[Dict[str, Any]], start_index: int
) -> Dict[str, Any]:
"""Extract structured table data: headers and body rows."""
headers = []
rows = []
in_thead = False
current_row = []
i = start_index + 1 # skip table_open
while i < len(tokens):
ttype = tokens[i].get('type', '')
if ttype == 'table_close':
break
elif ttype == 'thead_open':
in_thead = True
elif ttype == 'thead_close':
in_thead = False
elif ttype == 'tr_open':
current_row = []
elif ttype == 'tr_close':
if in_thead:
headers = current_row
else:
rows.append(current_row)
elif ttype == 'inline':
current_row.append(tokens[i].get('content', '').strip())
i += 1
return {'headers': headers, 'rows': rows}
def _find_table_in_tokens(
self, content_tokens: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Find and extract the first table in a section's content tokens."""
for i, token in enumerate(content_tokens):
if token.get('type') == 'table_open':
return self._extract_table_data(content_tokens, i)
return None
def _extract_list_items_text(
self, content_tokens: List[Dict[str, Any]]
) -> List[str]:
"""Extract text content of top-level list items from section tokens."""
items = []
in_list_item = False
nesting = 0
item_text_parts = []
for token in content_tokens:
ttype = token.get('type', '')
if ttype == 'list_item_open':
if nesting == 0:
in_list_item = True
item_text_parts = []
nesting += 1
elif ttype == 'list_item_close':
nesting -= 1
if nesting == 0:
in_list_item = False
items.append(' '.join(item_text_parts).strip())
elif ttype == 'inline' and in_list_item and nesting == 1:
item_text_parts.append(token.get('content', ''))
return items
@staticmethod
def _is_key_value_table(table_data: Dict[str, Any]) -> bool:
"""Detect if a table is a 2-column key-value table (empty headers, 2 cols per row)."""
if not table_data or not table_data.get('rows'):
return False
# All rows must have exactly 2 columns
if not all(len(row) == 2 for row in table_data['rows']):
return False
# Headers must be empty or absent
if table_data.get('headers'):
if not all(h.strip() == '' for h in table_data['headers']):
return False
return True
@staticmethod
def _has_top_level_paragraphs(tokens: List[Dict[str, Any]]) -> bool:
"""Check for paragraph tokens that are NOT nested inside list items."""
list_depth = 0
for t in tokens:
ttype = t.get('type', '')
if ttype in ('bullet_list_open', 'ordered_list_open'):
list_depth += 1
elif ttype in ('bullet_list_close', 'ordered_list_close'):
list_depth -= 1
elif ttype == 'paragraph_open' and list_depth == 0:
return True
return False
def _section_to_schema(self, section: Dict[str, Any]) -> Dict[str, Any]:
"""Convert a section tree node into its JSON schema representation."""
content_tokens = section['content_tokens']
children = section['children']
heading = section.get('heading', '')
# Detect content types present in this section
table_data = self._find_table_in_tokens(content_tokens)
has_list = any(
t.get('type') in ('bullet_list_open', 'ordered_list_open')
for t in content_tokens
)
has_paragraphs = self._has_top_level_paragraphs(content_tokens)
# --- Case 1: Key-value table → object with named properties ---
if table_data and self._is_key_value_table(table_data):
properties = {}
used_keys: set = set()
for row in table_data['rows']:
key = self._slugify(row[0])
# Deduplicate keys
original_key = key
counter = 2
while key in used_keys:
key = f"{original_key}_{counter}"
counter += 1
used_keys.add(key)
properties[key] = {
"type": "string",
"description": row[0]
}
schema: Dict[str, Any] = {
"type": "object",
"description": heading,
"properties": properties
}
# Merge child sections as additional properties
for child in children:
schema["properties"][child['slug']] = self._section_to_schema(child)
return schema
# --- Case 2: Data table with meaningful headers → array of objects ---
if table_data and not self._is_key_value_table(table_data) and table_data.get('headers'):
item_properties = {}
for hdr in table_data['headers']:
key = self._slugify(hdr)
if key:
item_properties[key] = {"type": "string", "description": hdr}
return {
"type": "array",
"description": heading,
"items": {
"type": "object",
"properties": item_properties
}
}
# --- Case 3: Pure list (no child sections) ---
if has_list and not children:
list_items = self._extract_list_items_text(content_tokens)
items_have_links = any('[' in it and '](' in it for it in list_items)
if has_paragraphs:
# Mixed: paragraphs + list
item_schema: Any = (
{
"type": "object",
"properties": {
"name": {"type": "string"},
"link": {"type": "string", "format": "uri"}
},
"required": ["name"]
}
if items_have_links
else {"type": "string"}
)
return {
"type": "object",
"description": heading,
"properties": {
"freitext": {"type": "string"},
"eintraege": {"type": "array", "items": item_schema}
}
}
# Pure list
if items_have_links:
return {
"type": "array",
"description": heading,
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"link": {"type": "string", "format": "uri"}
},
"required": ["name"]
}
}
return {
"type": "array",
"description": heading,
"items": {"type": "string"}
}
# --- Case 4: Section with child sub-sections ---
if children:
properties: Dict[str, Any] = {}
# Direct content before first child
if has_paragraphs or has_list:
if has_list:
properties["eintraege"] = {
"type": "array",
"items": {"type": "string"}
}
else:
properties["inhalt"] = {"type": "string"}
for child in children:
properties[child['slug']] = self._section_to_schema(child)
return {
"type": "object",
"description": heading,
"properties": properties
}
# --- Case 5: Text-only section ---
if has_paragraphs:
return {
"type": "string",
"description": heading
}
# --- Default: empty or unrecognized section ---
return {
"type": "string",
"description": heading
}
def _create_semantic_schema(
self,
tokens: List[Dict[str, Any]],
filename: str,
max_depth: Optional[int] = None
) -> Dict[str, Any]:
"""Create a semantic JSON schema from the document's section hierarchy."""
tree = self._build_section_tree(tokens, max_depth)
schema = {
"$schema": self.default_schema_url,
"type": "object",
"title": f"Schema from {filename}",
"description": f"Semantic schema describing the content structure of {filename}",
"properties": {}
}
# Build properties from top-level sections
for section in tree['children']:
section_schema = self._section_to_schema(section)
schema["properties"][section['slug']] = section_schema
return schema
# =========================================================================
# Syntactic schema generation (legacy mode: --mode syntactic / --mode outline)
# =========================================================================
def _analyze_ast_structure(self, tokens: List[Dict[str, Any]], max_depth: Optional[int]) -> Dict[str, Any]:
"""
Analyze AST tokens to extract structural patterns (element counting).
Args:
tokens: List of AST tokens from markdown-it
max_depth: Maximum heading depth to analyze
Returns:
Dictionary containing structural analysis
"""
analysis = {
'headings': defaultdict(list),
'paragraphs': [],
'lists': [],
'code_blocks': [],
'blockquotes': [],
'tables': [],
'links': [],
'images': [],
'emphasis': [],
'structure_types': set()
}
current_heading_level = 0
i = 0
while i < len(tokens):
token = tokens[i]
token_type = token.get('type', '')
# Track all structural types found
analysis['structure_types'].add(token_type)
# Analyze headings with depth filtering
if token_type == 'heading_open':
level = self._extract_heading_level(token.get('tag', ''))
if max_depth is None or level <= max_depth:
heading_content = self._extract_heading_content(tokens, i)
analysis['headings'][f'level_{level}'].append({
'content': heading_content,
'level': level,
'position': i
})
current_heading_level = level
# Analyze paragraphs
elif token_type == 'paragraph_open':
paragraph_content = self._extract_paragraph_content(tokens, i)
analysis['paragraphs'].append({
'content': paragraph_content,
'position': i,
'under_heading_level': current_heading_level
})
# Analyze lists
elif token_type in ['bullet_list_open', 'ordered_list_open']:
list_structure = self._extract_list_structure(tokens, i)
analysis['lists'].append({
'type': 'bullet' if token_type == 'bullet_list_open' else 'ordered',
'structure': list_structure,
'position': i,
'under_heading_level': current_heading_level
})
# Analyze code blocks
elif token_type == 'code_block' or token_type == 'fence':
code_info = self._extract_code_block_info(token)
analysis['code_blocks'].append({
'language': code_info.get('language', ''),
'content_length': len(code_info.get('content', '')),
'position': i,
'under_heading_level': current_heading_level
})
# Analyze blockquotes
elif token_type == 'blockquote_open':
quote_content = self._extract_blockquote_content(tokens, i)
analysis['blockquotes'].append({
'content': quote_content,
'position': i,
'under_heading_level': current_heading_level
})
# Analyze tables
elif token_type == 'table_open':
table_structure = self._extract_table_structure(tokens, i)
analysis['tables'].append({
'columns': table_structure.get('columns', 0),
'rows': table_structure.get('rows', 0),
'position': i,
'under_heading_level': current_heading_level
})
# Analyze inline elements
elif token_type == 'inline':
inline_analysis = self._analyze_inline_content(token)
analysis['links'].extend(inline_analysis.get('links', []))
analysis['images'].extend(inline_analysis.get('images', []))
analysis['emphasis'].extend(inline_analysis.get('emphasis', []))
i += 1
# Convert sets to lists for JSON serialization
analysis['structure_types'] = list(analysis['structure_types'])
return analysis
def _create_json_schema(
self,
analysis: Dict[str, Any],
filename: str,
mode: Optional[str] = None,
outline_depth: Optional[int] = None,
capture_heading_text: bool = False,
include_content_instructions: bool = False,
instruction_type: str = 'description'
) -> Dict[str, Any]:
"""
Create a JSON schema from structural analysis (syntactic/outline mode).
Args:
analysis: Structural analysis of the document
filename: Name of the source file
mode: Generation mode ('outline' for structure-focused schemas)
outline_depth: Depth limit for outline mode
capture_heading_text: Whether to capture exact heading text as constraints
include_content_instructions: Whether to include content instruction fields
instruction_type: Type of content instructions to generate
Returns:
JSON schema dictionary
"""
# Determine title format based on mode
title_preposition = "from" if mode == "outline" else "for"
schema = {
"$schema": self.default_schema_url,
"type": "object",
"title": f"Schema {title_preposition} {filename}",
"description": f"JSON schema describing the structure of {filename}",
"properties": {}
}
# Add metaschema extensions for outline mode
if mode == "outline":
schema["x-markitect-outline-mode"] = True
if outline_depth is not None:
schema["x-markitect-outline-depth"] = outline_depth
# Add metaschema extension for heading text capture
if capture_heading_text:
schema["x-markitect-heading-text-capture"] = True
# Add metaschema extension for content instructions
if include_content_instructions:
schema["x-markitect-content-instructions-enabled"] = True
# Add heading structure
if analysis['headings']:
heading_properties = {}
for level_key, headings in analysis['headings'].items():
if headings: # Only include levels that have content
# Configure content property based on heading text capture
if capture_heading_text:
# Extract actual heading texts in document order
heading_texts = [heading['content'] for heading in headings]
content_property = {"enum": heading_texts}
else:
content_property = {"type": "string"}
# Build properties for the heading item
item_properties = {
"content": content_property,
"level": {"type": "integer"},
"position": {"type": "integer"}
}
# Add content instruction fields if enabled
if include_content_instructions:
# Generate appropriate instruction text based on heading level
level_num = int(level_key.split('_')[1])
section_name = f"level {level_num} heading"
instruction_text = self._generate_content_instruction(section_name, instruction_type)
item_properties["x-markitect-content-instructions"] = {
"type": "string",
"const": instruction_text
}
item_properties["x-markitect-instruction-type"] = {
"type": "string",
"enum": [instruction_type]
}
heading_properties[level_key] = {
"type": "array",
"description": f"Headings at {level_key.replace('_', ' ')}",
"items": {
"type": "object",
"properties": item_properties,
"required": ["content", "level"]
},
"minItems": len(headings),
"maxItems": len(headings)
}
if heading_properties:
schema["properties"]["headings"] = {
"type": "object",
"description": "Document heading structure",
"properties": heading_properties
}
# Add other structural elements
structural_elements = {
"paragraphs": ("Text paragraphs", analysis['paragraphs']),
"lists": ("Lists (ordered and unordered)", analysis['lists']),
"code_blocks": ("Code blocks and fenced code", analysis['code_blocks']),
"blockquotes": ("Block quotations", analysis['blockquotes']),
"tables": ("Tables with rows and columns", analysis['tables']),
"links": ("Links to external resources", analysis['links']),
"images": ("Embedded images", analysis['images']),
"emphasis": ("Text emphasis (bold, italic)", analysis['emphasis'])
}
for element_name, (description, element_list) in structural_elements.items():
if element_list:
# Build base schema for the element
element_schema = {
"type": "array",
"description": description,
"minItems": len(element_list),
"maxItems": len(element_list)
}
# Add content instructions for paragraphs and lists if enabled
if include_content_instructions and element_name in ["paragraphs", "lists"]:
element_schema["items"] = {
"type": "object",
"properties": {
"content": {"type": "string"},
"x-markitect-content-instructions": {
"type": "string",
"const": self._generate_content_instruction(element_name, instruction_type)
},
"x-markitect-instruction-type": {
"type": "string",
"enum": [instruction_type]
}
}
}
schema["properties"][element_name] = element_schema
# Add metadata
schema["properties"]["metadata"] = {
"type": "object",
"description": "Document structure metadata",
"properties": {
"total_elements": {
"type": "integer",
"const": sum(len(v) if isinstance(v, list) else 0 for v in analysis.values())
},
"structure_types": {
"type": "array",
"items": {"type": "string"},
"description": "All structural element types found",
"const": analysis['structure_types']
}
}
}
return schema
# =========================================================================
# Shared helpers
# =========================================================================
def _extract_heading_level(self, tag: str) -> int:
"""Extract heading level from HTML tag (h1, h2, etc.)."""
if tag.startswith('h') and len(tag) == 2:
try:
return int(tag[1])
except ValueError:
pass
return 1
def _extract_heading_content(self, tokens: List[Dict[str, Any]], start_index: int) -> str:
"""Extract text content from heading tokens."""
# Look for the inline token that contains the heading text
for i in range(start_index, min(start_index + 3, len(tokens))):
token = tokens[i]
if token.get('type') == 'inline':
return token.get('content', '')
return ''
def _extract_paragraph_content(self, tokens: List[Dict[str, Any]], start_index: int) -> str:
"""Extract text content from paragraph tokens."""
# Look for the inline token that contains the paragraph text
for i in range(start_index, min(start_index + 3, len(tokens))):
token = tokens[i]
if token.get('type') == 'inline':
return token.get('content', '')
return ''
def _extract_list_structure(self, tokens: List[Dict[str, Any]], start_index: int) -> Dict[str, Any]:
"""Extract list structure information."""
return {
"type": "list",
"estimated_items": 1
}
def _extract_code_block_info(self, token: Dict[str, Any]) -> Dict[str, Any]:
"""Extract code block information."""
return {
"language": token.get('info', '').split()[0] if token.get('info') else '',
"content": token.get('content', '')
}
def _extract_blockquote_content(self, tokens: List[Dict[str, Any]], start_index: int) -> str:
"""Extract blockquote content."""
return "blockquote content"
def _extract_table_structure(self, tokens: List[Dict[str, Any]], start_index: int) -> Dict[str, Any]:
"""Extract table structure information (legacy syntactic mode)."""
return {
"columns": 2,
"rows": 1
}
def _analyze_inline_content(self, token: Dict[str, Any]) -> Dict[str, List[Any]]:
"""Analyze inline content for links, images, emphasis."""
result = {
"links": [],
"images": [],
"emphasis": []
}
# Analyze children tokens if they exist
children = token.get('children', [])
for child in children:
if child and isinstance(child, dict):
child_type = child.get('type', '')
if child_type == 'link_open':
result['links'].append({"type": "link"})
elif child_type == 'image':
result['images'].append({"type": "image"})
elif child_type in ['em_open', 'strong_open']:
result['emphasis'].append({"type": child_type})
return result
def _generate_content_instruction(self, heading_text: str, instruction_type: str) -> str:
"""Generate content instruction text based on heading and instruction type."""
if instruction_type == "description":
return f"Provide content for the '{heading_text}' section"
elif instruction_type == "example":
return f"Example content for the '{heading_text}' section"
elif instruction_type == "constraint":
return f"Content must be relevant to '{heading_text}'"
elif instruction_type == "template":
return f"Template content for '{heading_text}' section"
else:
return f"Content for the '{heading_text}' section"