""" Schema Analyzer for Phase 2: Schema Refinement Tools Analyzes JSON schemas to detect rigidity issues and provide suggestions for improvement using the Phase 1 classification system. """ from pathlib import Path from typing import Dict, Any, List, Optional, Tuple import json from dataclasses import dataclass, field from enum import Enum class IssueType(Enum): """Types of schema rigidity issues.""" EXACT_COUNT = "exact_count" MISSING_CLASSIFICATIONS = "missing_classifications" MISSING_CONTENT_INSTRUCTIONS = "missing_content_instructions" OVERLY_SPECIFIC = "overly_specific" NO_FLEXIBILITY = "no_flexibility" DEPRECATED_EXTENSIONS = "deprecated_extensions" class IssueSeverity(Enum): """Severity levels for schema issues.""" INFO = "info" WARNING = "warning" ERROR = "error" @dataclass class SchemaIssue: """Represents a detected schema issue.""" issue_type: IssueType severity: IssueSeverity path: str message: str suggestion: str current_value: Any = None suggested_value: Any = None @dataclass class SchemaAnalysisResult: """Results of schema analysis.""" is_rigid: bool rigidity_score: int # 0-100, higher = more rigid issues: List[SchemaIssue] = field(default_factory=list) has_classifications: bool = False has_content_control: bool = False uses_deprecated_extensions: bool = False @property def issue_count_by_severity(self) -> Dict[IssueSeverity, int]: """Count issues by severity.""" counts = {severity: 0 for severity in IssueSeverity} for issue in self.issues: counts[issue.severity] += 1 return counts class SchemaAnalyzer: """Analyzes schemas for rigidity and suggests improvements.""" def __init__(self): """Initialize the schema analyzer.""" self.deprecated_extensions = [ "x-markitect-required-sections", "x-markitect-recommended-sections", "x-markitect-optional-sections" ] def analyze_schema(self, schema: Dict[str, Any]) -> SchemaAnalysisResult: """ Analyze a schema for rigidity issues. Args: schema: The JSON schema to analyze Returns: SchemaAnalysisResult with detected issues and suggestions """ result = SchemaAnalysisResult(is_rigid=False, rigidity_score=0) # Check for Phase 1 features result.has_classifications = "x-markitect-sections" in schema result.has_content_control = "x-markitect-content-control" in schema # Check for deprecated extensions for deprecated in self.deprecated_extensions: if deprecated in schema: result.uses_deprecated_extensions = True result.issues.append(SchemaIssue( issue_type=IssueType.DEPRECATED_EXTENSIONS, severity=IssueSeverity.WARNING, path=deprecated, message=f"Using deprecated extension '{deprecated}'", suggestion=f"Migrate to 'x-markitect-sections' with classification system" )) # Analyze properties for rigidity if "properties" in schema: self._analyze_properties(schema["properties"], result, "properties") # Check for missing classifications if not result.has_classifications: result.issues.append(SchemaIssue( issue_type=IssueType.MISSING_CLASSIFICATIONS, severity=IssueSeverity.INFO, path="root", message="Schema does not use section classification system", suggestion="Add 'x-markitect-sections' to classify sections as required/recommended/optional/discouraged/improper" )) # Check for missing content control if not result.has_content_control: result.issues.append(SchemaIssue( issue_type=IssueType.MISSING_CONTENT_INSTRUCTIONS, severity=IssueSeverity.INFO, path="root", message="Schema does not provide content control", suggestion="Add 'x-markitect-content-control' for pattern validation and quality metrics" )) # Calculate rigidity score result.rigidity_score = self._calculate_rigidity_score(result) result.is_rigid = result.rigidity_score > 50 return result def _analyze_properties(self, properties: Dict[str, Any], result: SchemaAnalysisResult, path: str): """Analyze schema properties for rigidity issues.""" for prop_name, prop_def in properties.items(): prop_path = f"{path}.{prop_name}" if not isinstance(prop_def, dict): continue # Check for exact counts (const) if "const" in prop_def: result.issues.append(SchemaIssue( issue_type=IssueType.EXACT_COUNT, severity=IssueSeverity.WARNING, path=prop_path, message=f"Property '{prop_name}' requires exact value", suggestion=f"Consider using a range or removing constraint for flexibility", current_value=prop_def["const"] )) # Check for arrays with exact counts if prop_def.get("type") == "array": min_items = prop_def.get("minItems") max_items = prop_def.get("maxItems") if min_items is not None and max_items is not None and min_items == max_items: result.issues.append(SchemaIssue( issue_type=IssueType.EXACT_COUNT, severity=IssueSeverity.WARNING, path=prop_path, message=f"Array '{prop_name}' requires exactly {min_items} items", suggestion=f"Use a range like minItems: {max(0, min_items - 2)}, maxItems: {min_items + 5}", current_value={"minItems": min_items, "maxItems": max_items}, suggested_value={ "minItems": max(0, min_items - 2), "maxItems": min_items + 5 } )) # Check for overly specific counts (large numbers) if min_items is not None and min_items > 50: result.issues.append(SchemaIssue( issue_type=IssueType.OVERLY_SPECIFIC, severity=IssueSeverity.INFO, path=prop_path, message=f"Array '{prop_name}' has very specific minItems: {min_items}", suggestion=f"Consider rounding to {(min_items // 10) * 10} for flexibility", current_value=min_items, suggested_value=(min_items // 10) * 10 )) # Check for overly specific integer constraints if prop_def.get("type") == "integer": if "minimum" in prop_def and "maximum" in prop_def: min_val = prop_def["minimum"] max_val = prop_def["maximum"] range_size = max_val - min_val if range_size < 3: result.issues.append(SchemaIssue( issue_type=IssueType.NO_FLEXIBILITY, severity=IssueSeverity.INFO, path=prop_path, message=f"Integer '{prop_name}' has very narrow range: {min_val}-{max_val}", suggestion=f"Consider widening range for flexibility", current_value={"minimum": min_val, "maximum": max_val} )) # Recursively check nested properties if "properties" in prop_def: self._analyze_properties(prop_def["properties"], result, prop_path) # Check items schema for arrays if "items" in prop_def and isinstance(prop_def["items"], dict): if "properties" in prop_def["items"]: self._analyze_properties( prop_def["items"]["properties"], result, f"{prop_path}.items" ) def _calculate_rigidity_score(self, result: SchemaAnalysisResult) -> int: """ Calculate overall rigidity score (0-100). Higher score = more rigid schema. """ score = 0 # Count issues by type with weighted scores weights = { IssueType.EXACT_COUNT: 15, IssueType.OVERLY_SPECIFIC: 10, IssueType.NO_FLEXIBILITY: 8, IssueType.MISSING_CLASSIFICATIONS: 5, IssueType.MISSING_CONTENT_INSTRUCTIONS: 3, IssueType.DEPRECATED_EXTENSIONS: 5 } for issue in result.issues: score += weights.get(issue.issue_type, 5) # Cap at 100 return min(100, score) def analyze_schema_file(self, schema_path: Path) -> SchemaAnalysisResult: """ Analyze a schema file. Args: schema_path: Path to JSON schema file Returns: SchemaAnalysisResult """ with open(schema_path) as f: schema = json.load(f) return self.analyze_schema(schema) def format_analysis_report(self, result: SchemaAnalysisResult, verbose: bool = False) -> str: """ Format analysis results as a human-readable report. Args: result: Analysis results verbose: Include detailed information Returns: Formatted report string """ lines = [] # Header lines.append("=" * 70) lines.append("Schema Analysis Report") lines.append("=" * 70) lines.append("") # Overall assessment rigidity_level = "HIGH" if result.rigidity_score > 70 else "MEDIUM" if result.rigidity_score > 40 else "LOW" lines.append(f"Rigidity Score: {result.rigidity_score}/100 ({rigidity_level})") lines.append(f"Status: {'RIGID - Needs refinement' if result.is_rigid else 'FLEXIBLE - Good'}") lines.append("") # Features check lines.append("Phase 1 Features:") lines.append(f" ✓ Classifications: {'Yes' if result.has_classifications else 'No'}") lines.append(f" ✓ Content Control: {'Yes' if result.has_content_control else 'No'}") if result.uses_deprecated_extensions: lines.append(f" ⚠ Deprecated Extensions: Yes (needs migration)") lines.append("") # Issue summary counts = result.issue_count_by_severity lines.append(f"Issues Found: {len(result.issues)} total") lines.append(f" - Errors: {counts[IssueSeverity.ERROR]}") lines.append(f" - Warnings: {counts[IssueSeverity.WARNING]}") lines.append(f" - Info: {counts[IssueSeverity.INFO]}") lines.append("") # List issues if result.issues: lines.append("Detected Issues:") lines.append("-" * 70) for i, issue in enumerate(result.issues, 1): severity_icon = "❌" if issue.severity == IssueSeverity.ERROR else "⚠️ " if issue.severity == IssueSeverity.WARNING else "ℹ️ " lines.append(f"{i}. {severity_icon} {issue.message}") lines.append(f" Path: {issue.path}") lines.append(f" Suggestion: {issue.suggestion}") if verbose and issue.current_value is not None: lines.append(f" Current: {json.dumps(issue.current_value)}") if verbose and issue.suggested_value is not None: lines.append(f" Suggested: {json.dumps(issue.suggested_value)}") lines.append("") else: lines.append("✅ No issues found - schema is well-designed!") lines.append("") # Recommendations if result.is_rigid: lines.append("Recommendations:") lines.append("-" * 70) lines.append("Run: markitect schema-refine --loosen-counts") lines.append(" to automatically apply suggested improvements") lines.append("") return "\n".join(lines) def analyze_schema_cli(schema_path: str, verbose: bool = False) -> int: """ CLI entry point for schema analysis. Args: schema_path: Path to schema file verbose: Show detailed information Returns: Exit code (0 = success, 1 = rigid schema found) """ analyzer = SchemaAnalyzer() try: result = analyzer.analyze_schema_file(Path(schema_path)) report = analyzer.format_analysis_report(result, verbose=verbose) print(report) return 1 if result.is_rigid else 0 except FileNotFoundError: print(f"Error: Schema file not found: {schema_path}") return 2 except json.JSONDecodeError as e: print(f"Error: Invalid JSON in schema file: {e}") return 2 except Exception as e: print(f"Error: {e}") return 2