""" Schema Generator for Issue #5: Generate a Schema from a Markdown File. This module provides functionality to analyze markdown AST structures and generate JSON schemas that describe the document's structural elements with configurable depth limitations for architectural documentation analysis. Supports two generation modes: - semantic (default): Builds a content-aware schema from the document's section hierarchy, detecting key-value tables, lists, and mixed content patterns. - syntactic: Counts markdown elements by type (legacy behavior). """ import json import re from collections import defaultdict from pathlib import Path from typing import Dict, List, Any, Optional, Set from markitect.core.parser import parse_markdown_to_ast from markitect.exceptions import FileNotFoundError, InvalidDepthError, InvalidInstructionTypeError class SchemaGenerator: """ Generates JSON schemas from markdown file AST structures. Analyzes the structural elements of markdown documents and creates JSON schemas that can be used for validation and compliance checking in architecture documentation workflows. """ def __init__(self): """Initialize the schema generator.""" self.default_schema_url = "http://json-schema.org/draft-07/schema#" def generate_schema_from_file( self, file_path: Path, max_depth: Optional[int] = None, mode: Optional[str] = None, outline_depth: Optional[int] = None, capture_heading_text: bool = False, include_content_instructions: bool = False, instruction_type: str = 'description' ) -> Dict[str, Any]: """ Generate a JSON schema from a markdown file's AST structure. Args: file_path: Path to the markdown file max_depth: Maximum heading depth to include (None = unlimited) mode: Generation mode: None/'semantic' for content-aware schemas, 'syntactic' for element counting, 'outline' for legacy outline mode outline_depth: Depth limit for outline mode capture_heading_text: Whether to capture exact heading text as constraints include_content_instructions: Whether to include content instruction fields instruction_type: Type of content instructions ('description', 'example', 'constraint', 'template') Returns: JSON schema as a dictionary Raises: FileNotFoundError: If the markdown file doesn't exist InvalidDepthError: If max_depth is invalid (< 1) """ # Validate inputs if not file_path.exists(): raise FileNotFoundError(f"Markdown file not found: {file_path}") if max_depth is not None and max_depth < 1: raise InvalidDepthError(f"max_depth must be >= 1, got: {max_depth}") # Validate instruction type valid_instruction_types = {'description', 'example', 'constraint', 'template'} if instruction_type not in valid_instruction_types: raise InvalidInstructionTypeError(f"Invalid instruction type '{instruction_type}'. Must be one of: {', '.join(valid_instruction_types)}") # Read and parse the markdown file content = file_path.read_text(encoding='utf-8') ast_tokens = parse_markdown_to_ast(content) # Auto-select syntactic mode when syntactic-only options are used effective_mode = mode if effective_mode is None and (capture_heading_text or include_content_instructions): effective_mode = 'syntactic' # Dispatch based on mode if effective_mode in ('syntactic', 'outline'): # Legacy: syntactic element-counting schema structure_analysis = self._analyze_ast_structure(ast_tokens, max_depth) schema = self._create_json_schema( structure_analysis, file_path.name, mode=effective_mode, outline_depth=outline_depth, capture_heading_text=capture_heading_text, include_content_instructions=include_content_instructions, instruction_type=instruction_type ) else: # Default: semantic content-aware schema schema = self._create_semantic_schema(ast_tokens, file_path.name, max_depth) return schema # ========================================================================= # Semantic schema generation (default mode) # ========================================================================= @staticmethod def _slugify(text: str) -> str: """Convert heading or label text to a valid JSON property key.""" replacements = { 'ä': 'ae', 'ö': 'oe', 'ü': 'ue', 'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue', 'ß': 'ss', } slug = text for char, repl in replacements.items(): slug = slug.replace(char, repl) slug = slug.lower() slug = re.sub(r'[^a-z0-9]+', '_', slug) slug = slug.strip('_') return slug or 'feld' def _build_section_tree( self, tokens: List[Dict[str, Any]], max_depth: Optional[int] = None ) -> Dict[str, Any]: """ Build a hierarchical section tree from flat markdown-it token list. Returns a root node with children. Each node has: - heading: str (None for root) - level: int (0 for root) - slug: str - content_tokens: list of non-heading tokens belonging to this section - children: list of sub-sections """ root = { 'heading': None, 'level': 0, 'slug': '', 'content_tokens': [], 'children': [] } stack = [root] i = 0 while i < len(tokens): token = tokens[i] if token.get('type') == 'heading_open': level = self._extract_heading_level(token.get('tag', '')) heading_text = self._extract_heading_content(tokens, i) if max_depth is not None and level > max_depth: # Skip this heading and its close token, but keep content i += 1 while i < len(tokens) and tokens[i].get('type') != 'heading_close': i += 1 i += 1 continue section = { 'heading': heading_text, 'level': level, 'slug': self._slugify(heading_text), 'content_tokens': [], 'children': [] } # Pop stack until we find the parent (level < current) while len(stack) > 1 and stack[-1]['level'] >= level: stack.pop() stack[-1]['children'].append(section) stack.append(section) # Skip past heading_close i += 1 while i < len(tokens) and tokens[i].get('type') != 'heading_close': i += 1 else: # Add content token to current section stack[-1]['content_tokens'].append(token) i += 1 return root def _extract_table_data( self, tokens: List[Dict[str, Any]], start_index: int ) -> Dict[str, Any]: """Extract structured table data: headers and body rows.""" headers = [] rows = [] in_thead = False current_row = [] i = start_index + 1 # skip table_open while i < len(tokens): ttype = tokens[i].get('type', '') if ttype == 'table_close': break elif ttype == 'thead_open': in_thead = True elif ttype == 'thead_close': in_thead = False elif ttype == 'tr_open': current_row = [] elif ttype == 'tr_close': if in_thead: headers = current_row else: rows.append(current_row) elif ttype == 'inline': current_row.append(tokens[i].get('content', '').strip()) i += 1 return {'headers': headers, 'rows': rows} def _find_table_in_tokens( self, content_tokens: List[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """Find and extract the first table in a section's content tokens.""" for i, token in enumerate(content_tokens): if token.get('type') == 'table_open': return self._extract_table_data(content_tokens, i) return None def _extract_list_items_text( self, content_tokens: List[Dict[str, Any]] ) -> List[str]: """Extract text content of top-level list items from section tokens.""" items = [] in_list_item = False nesting = 0 item_text_parts = [] for token in content_tokens: ttype = token.get('type', '') if ttype == 'list_item_open': if nesting == 0: in_list_item = True item_text_parts = [] nesting += 1 elif ttype == 'list_item_close': nesting -= 1 if nesting == 0: in_list_item = False items.append(' '.join(item_text_parts).strip()) elif ttype == 'inline' and in_list_item and nesting == 1: item_text_parts.append(token.get('content', '')) return items @staticmethod def _is_key_value_table(table_data: Dict[str, Any]) -> bool: """Detect if a table is a 2-column key-value table (empty headers, 2 cols per row).""" if not table_data or not table_data.get('rows'): return False # All rows must have exactly 2 columns if not all(len(row) == 2 for row in table_data['rows']): return False # Headers must be empty or absent if table_data.get('headers'): if not all(h.strip() == '' for h in table_data['headers']): return False return True @staticmethod def _has_top_level_paragraphs(tokens: List[Dict[str, Any]]) -> bool: """Check for paragraph tokens that are NOT nested inside list items.""" list_depth = 0 for t in tokens: ttype = t.get('type', '') if ttype in ('bullet_list_open', 'ordered_list_open'): list_depth += 1 elif ttype in ('bullet_list_close', 'ordered_list_close'): list_depth -= 1 elif ttype == 'paragraph_open' and list_depth == 0: return True return False def _section_to_schema(self, section: Dict[str, Any]) -> Dict[str, Any]: """Convert a section tree node into its JSON schema representation.""" content_tokens = section['content_tokens'] children = section['children'] heading = section.get('heading', '') # Detect content types present in this section table_data = self._find_table_in_tokens(content_tokens) has_list = any( t.get('type') in ('bullet_list_open', 'ordered_list_open') for t in content_tokens ) has_paragraphs = self._has_top_level_paragraphs(content_tokens) # --- Case 1: Key-value table → object with named properties --- if table_data and self._is_key_value_table(table_data): properties = {} used_keys: set = set() for row in table_data['rows']: key = self._slugify(row[0]) # Deduplicate keys original_key = key counter = 2 while key in used_keys: key = f"{original_key}_{counter}" counter += 1 used_keys.add(key) properties[key] = { "type": "string", "description": row[0] } schema: Dict[str, Any] = { "type": "object", "description": heading, "properties": properties } # Merge child sections as additional properties for child in children: schema["properties"][child['slug']] = self._section_to_schema(child) return schema # --- Case 2: Data table with meaningful headers → array of objects --- if table_data and not self._is_key_value_table(table_data) and table_data.get('headers'): item_properties = {} for hdr in table_data['headers']: key = self._slugify(hdr) if key: item_properties[key] = {"type": "string", "description": hdr} return { "type": "array", "description": heading, "items": { "type": "object", "properties": item_properties } } # --- Case 3: Pure list (no child sections) --- if has_list and not children: list_items = self._extract_list_items_text(content_tokens) items_have_links = any('[' in it and '](' in it for it in list_items) if has_paragraphs: # Mixed: paragraphs + list item_schema: Any = ( { "type": "object", "properties": { "name": {"type": "string"}, "link": {"type": "string", "format": "uri"} }, "required": ["name"] } if items_have_links else {"type": "string"} ) return { "type": "object", "description": heading, "properties": { "freitext": {"type": "string"}, "eintraege": {"type": "array", "items": item_schema} } } # Pure list if items_have_links: return { "type": "array", "description": heading, "items": { "type": "object", "properties": { "name": {"type": "string"}, "link": {"type": "string", "format": "uri"} }, "required": ["name"] } } return { "type": "array", "description": heading, "items": {"type": "string"} } # --- Case 4: Section with child sub-sections --- if children: properties: Dict[str, Any] = {} # Direct content before first child if has_paragraphs or has_list: if has_list: properties["eintraege"] = { "type": "array", "items": {"type": "string"} } else: properties["inhalt"] = {"type": "string"} for child in children: properties[child['slug']] = self._section_to_schema(child) return { "type": "object", "description": heading, "properties": properties } # --- Case 5: Text-only section --- if has_paragraphs: return { "type": "string", "description": heading } # --- Default: empty or unrecognized section --- return { "type": "string", "description": heading } def _create_semantic_schema( self, tokens: List[Dict[str, Any]], filename: str, max_depth: Optional[int] = None ) -> Dict[str, Any]: """Create a semantic JSON schema from the document's section hierarchy.""" tree = self._build_section_tree(tokens, max_depth) schema = { "$schema": self.default_schema_url, "type": "object", "title": f"Schema from {filename}", "description": f"Semantic schema describing the content structure of {filename}", "properties": {} } # Build properties from top-level sections for section in tree['children']: section_schema = self._section_to_schema(section) schema["properties"][section['slug']] = section_schema return schema # ========================================================================= # Syntactic schema generation (legacy mode: --mode syntactic / --mode outline) # ========================================================================= def _analyze_ast_structure(self, tokens: List[Dict[str, Any]], max_depth: Optional[int]) -> Dict[str, Any]: """ Analyze AST tokens to extract structural patterns (element counting). Args: tokens: List of AST tokens from markdown-it max_depth: Maximum heading depth to analyze Returns: Dictionary containing structural analysis """ analysis = { 'headings': defaultdict(list), 'paragraphs': [], 'lists': [], 'code_blocks': [], 'blockquotes': [], 'tables': [], 'links': [], 'images': [], 'emphasis': [], 'structure_types': set() } current_heading_level = 0 i = 0 while i < len(tokens): token = tokens[i] token_type = token.get('type', '') # Track all structural types found analysis['structure_types'].add(token_type) # Analyze headings with depth filtering if token_type == 'heading_open': level = self._extract_heading_level(token.get('tag', '')) if max_depth is None or level <= max_depth: heading_content = self._extract_heading_content(tokens, i) analysis['headings'][f'level_{level}'].append({ 'content': heading_content, 'level': level, 'position': i }) current_heading_level = level # Analyze paragraphs elif token_type == 'paragraph_open': paragraph_content = self._extract_paragraph_content(tokens, i) analysis['paragraphs'].append({ 'content': paragraph_content, 'position': i, 'under_heading_level': current_heading_level }) # Analyze lists elif token_type in ['bullet_list_open', 'ordered_list_open']: list_structure = self._extract_list_structure(tokens, i) analysis['lists'].append({ 'type': 'bullet' if token_type == 'bullet_list_open' else 'ordered', 'structure': list_structure, 'position': i, 'under_heading_level': current_heading_level }) # Analyze code blocks elif token_type == 'code_block' or token_type == 'fence': code_info = self._extract_code_block_info(token) analysis['code_blocks'].append({ 'language': code_info.get('language', ''), 'content_length': len(code_info.get('content', '')), 'position': i, 'under_heading_level': current_heading_level }) # Analyze blockquotes elif token_type == 'blockquote_open': quote_content = self._extract_blockquote_content(tokens, i) analysis['blockquotes'].append({ 'content': quote_content, 'position': i, 'under_heading_level': current_heading_level }) # Analyze tables elif token_type == 'table_open': table_structure = self._extract_table_structure(tokens, i) analysis['tables'].append({ 'columns': table_structure.get('columns', 0), 'rows': table_structure.get('rows', 0), 'position': i, 'under_heading_level': current_heading_level }) # Analyze inline elements elif token_type == 'inline': inline_analysis = self._analyze_inline_content(token) analysis['links'].extend(inline_analysis.get('links', [])) analysis['images'].extend(inline_analysis.get('images', [])) analysis['emphasis'].extend(inline_analysis.get('emphasis', [])) i += 1 # Convert sets to lists for JSON serialization analysis['structure_types'] = list(analysis['structure_types']) return analysis def _create_json_schema( self, analysis: Dict[str, Any], filename: str, mode: Optional[str] = None, outline_depth: Optional[int] = None, capture_heading_text: bool = False, include_content_instructions: bool = False, instruction_type: str = 'description' ) -> Dict[str, Any]: """ Create a JSON schema from structural analysis (syntactic/outline mode). Args: analysis: Structural analysis of the document filename: Name of the source file mode: Generation mode ('outline' for structure-focused schemas) outline_depth: Depth limit for outline mode capture_heading_text: Whether to capture exact heading text as constraints include_content_instructions: Whether to include content instruction fields instruction_type: Type of content instructions to generate Returns: JSON schema dictionary """ # Determine title format based on mode title_preposition = "from" if mode == "outline" else "for" schema = { "$schema": self.default_schema_url, "type": "object", "title": f"Schema {title_preposition} {filename}", "description": f"JSON schema describing the structure of {filename}", "properties": {} } # Add metaschema extensions for outline mode if mode == "outline": schema["x-markitect-outline-mode"] = True if outline_depth is not None: schema["x-markitect-outline-depth"] = outline_depth # Add metaschema extension for heading text capture if capture_heading_text: schema["x-markitect-heading-text-capture"] = True # Add metaschema extension for content instructions if include_content_instructions: schema["x-markitect-content-instructions-enabled"] = True # Add heading structure if analysis['headings']: heading_properties = {} for level_key, headings in analysis['headings'].items(): if headings: # Only include levels that have content # Configure content property based on heading text capture if capture_heading_text: # Extract actual heading texts in document order heading_texts = [heading['content'] for heading in headings] content_property = {"enum": heading_texts} else: content_property = {"type": "string"} # Build properties for the heading item item_properties = { "content": content_property, "level": {"type": "integer"}, "position": {"type": "integer"} } # Add content instruction fields if enabled if include_content_instructions: # Generate appropriate instruction text based on heading level level_num = int(level_key.split('_')[1]) section_name = f"level {level_num} heading" instruction_text = self._generate_content_instruction(section_name, instruction_type) item_properties["x-markitect-content-instructions"] = { "type": "string", "const": instruction_text } item_properties["x-markitect-instruction-type"] = { "type": "string", "enum": [instruction_type] } heading_properties[level_key] = { "type": "array", "description": f"Headings at {level_key.replace('_', ' ')}", "items": { "type": "object", "properties": item_properties, "required": ["content", "level"] }, "minItems": len(headings), "maxItems": len(headings) } if heading_properties: schema["properties"]["headings"] = { "type": "object", "description": "Document heading structure", "properties": heading_properties } # Add other structural elements structural_elements = { "paragraphs": ("Text paragraphs", analysis['paragraphs']), "lists": ("Lists (ordered and unordered)", analysis['lists']), "code_blocks": ("Code blocks and fenced code", analysis['code_blocks']), "blockquotes": ("Block quotations", analysis['blockquotes']), "tables": ("Tables with rows and columns", analysis['tables']), "links": ("Links to external resources", analysis['links']), "images": ("Embedded images", analysis['images']), "emphasis": ("Text emphasis (bold, italic)", analysis['emphasis']) } for element_name, (description, element_list) in structural_elements.items(): if element_list: # Build base schema for the element element_schema = { "type": "array", "description": description, "minItems": len(element_list), "maxItems": len(element_list) } # Add content instructions for paragraphs and lists if enabled if include_content_instructions and element_name in ["paragraphs", "lists"]: element_schema["items"] = { "type": "object", "properties": { "content": {"type": "string"}, "x-markitect-content-instructions": { "type": "string", "const": self._generate_content_instruction(element_name, instruction_type) }, "x-markitect-instruction-type": { "type": "string", "enum": [instruction_type] } } } schema["properties"][element_name] = element_schema # Add metadata schema["properties"]["metadata"] = { "type": "object", "description": "Document structure metadata", "properties": { "total_elements": { "type": "integer", "const": sum(len(v) if isinstance(v, list) else 0 for v in analysis.values()) }, "structure_types": { "type": "array", "items": {"type": "string"}, "description": "All structural element types found", "const": analysis['structure_types'] } } } return schema # ========================================================================= # Shared helpers # ========================================================================= def _extract_heading_level(self, tag: str) -> int: """Extract heading level from HTML tag (h1, h2, etc.).""" if tag.startswith('h') and len(tag) == 2: try: return int(tag[1]) except ValueError: pass return 1 def _extract_heading_content(self, tokens: List[Dict[str, Any]], start_index: int) -> str: """Extract text content from heading tokens.""" # Look for the inline token that contains the heading text for i in range(start_index, min(start_index + 3, len(tokens))): token = tokens[i] if token.get('type') == 'inline': return token.get('content', '') return '' def _extract_paragraph_content(self, tokens: List[Dict[str, Any]], start_index: int) -> str: """Extract text content from paragraph tokens.""" # Look for the inline token that contains the paragraph text for i in range(start_index, min(start_index + 3, len(tokens))): token = tokens[i] if token.get('type') == 'inline': return token.get('content', '') return '' def _extract_list_structure(self, tokens: List[Dict[str, Any]], start_index: int) -> Dict[str, Any]: """Extract list structure information.""" return { "type": "list", "estimated_items": 1 } def _extract_code_block_info(self, token: Dict[str, Any]) -> Dict[str, Any]: """Extract code block information.""" return { "language": token.get('info', '').split()[0] if token.get('info') else '', "content": token.get('content', '') } def _extract_blockquote_content(self, tokens: List[Dict[str, Any]], start_index: int) -> str: """Extract blockquote content.""" return "blockquote content" def _extract_table_structure(self, tokens: List[Dict[str, Any]], start_index: int) -> Dict[str, Any]: """Extract table structure information (legacy syntactic mode).""" return { "columns": 2, "rows": 1 } def _analyze_inline_content(self, token: Dict[str, Any]) -> Dict[str, List[Any]]: """Analyze inline content for links, images, emphasis.""" result = { "links": [], "images": [], "emphasis": [] } # Analyze children tokens if they exist children = token.get('children', []) for child in children: if child and isinstance(child, dict): child_type = child.get('type', '') if child_type == 'link_open': result['links'].append({"type": "link"}) elif child_type == 'image': result['images'].append({"type": "image"}) elif child_type in ['em_open', 'strong_open']: result['emphasis'].append({"type": child_type}) return result def _generate_content_instruction(self, heading_text: str, instruction_type: str) -> str: """Generate content instruction text based on heading and instruction type.""" if instruction_type == "description": return f"Provide content for the '{heading_text}' section" elif instruction_type == "example": return f"Example content for the '{heading_text}' section" elif instruction_type == "constraint": return f"Content must be relevant to '{heading_text}'" elif instruction_type == "template": return f"Template content for '{heading_text}' section" else: return f"Content for the '{heading_text}' section"