Files
markitect-main/markitect/schema_analyzer.py
tegwick 2b35fcde62 feat: add Phase 2 schema refinement tools (schema-analyze and schema-refine)
Implemented two new CLI commands for schema analysis and refinement:

1. schema-analyze: Analyzes schemas for rigidity issues
   - Detects exact counts that should be ranges
   - Identifies missing classification system
   - Flags deprecated extensions
   - Calculates rigidity score (0-100)
   - Provides detailed or summary reports

2. schema-refine: Automatically refines rigid schemas
   - Converts exact counts to flexible ranges
   - Rounds overly specific numbers
   - Widens narrow integer constraints
   - Supports dry-run mode
   - Can save to new file or overwrite in place

Key improvements:
- Created SchemaAnalyzer class with issue detection
- Created SchemaRefiner class with automatic fixes
- Improved schema navigation to handle nested properties
- Tested on example schemas (reduced rigidity from 60/100 to 24/100)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-04 21:29:08 +01:00

353 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Schema Analyzer for Phase 2: Schema Refinement Tools
Analyzes JSON schemas to detect rigidity issues and provide suggestions
for improvement using the Phase 1 classification system.
"""
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
import json
from dataclasses import dataclass, field
from enum import Enum
class IssueType(Enum):
"""Types of schema rigidity issues."""
EXACT_COUNT = "exact_count"
MISSING_CLASSIFICATIONS = "missing_classifications"
MISSING_CONTENT_INSTRUCTIONS = "missing_content_instructions"
OVERLY_SPECIFIC = "overly_specific"
NO_FLEXIBILITY = "no_flexibility"
DEPRECATED_EXTENSIONS = "deprecated_extensions"
class IssueSeverity(Enum):
"""Severity levels for schema issues."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
@dataclass
class SchemaIssue:
"""Represents a detected schema issue."""
issue_type: IssueType
severity: IssueSeverity
path: str
message: str
suggestion: str
current_value: Any = None
suggested_value: Any = None
@dataclass
class SchemaAnalysisResult:
"""Results of schema analysis."""
is_rigid: bool
rigidity_score: int # 0-100, higher = more rigid
issues: List[SchemaIssue] = field(default_factory=list)
has_classifications: bool = False
has_content_control: bool = False
uses_deprecated_extensions: bool = False
@property
def issue_count_by_severity(self) -> Dict[IssueSeverity, int]:
"""Count issues by severity."""
counts = {severity: 0 for severity in IssueSeverity}
for issue in self.issues:
counts[issue.severity] += 1
return counts
class SchemaAnalyzer:
"""Analyzes schemas for rigidity and suggests improvements."""
def __init__(self):
"""Initialize the schema analyzer."""
self.deprecated_extensions = [
"x-markitect-required-sections",
"x-markitect-recommended-sections",
"x-markitect-optional-sections"
]
def analyze_schema(self, schema: Dict[str, Any]) -> SchemaAnalysisResult:
"""
Analyze a schema for rigidity issues.
Args:
schema: The JSON schema to analyze
Returns:
SchemaAnalysisResult with detected issues and suggestions
"""
result = SchemaAnalysisResult(is_rigid=False, rigidity_score=0)
# Check for Phase 1 features
result.has_classifications = "x-markitect-sections" in schema
result.has_content_control = "x-markitect-content-control" in schema
# Check for deprecated extensions
for deprecated in self.deprecated_extensions:
if deprecated in schema:
result.uses_deprecated_extensions = True
result.issues.append(SchemaIssue(
issue_type=IssueType.DEPRECATED_EXTENSIONS,
severity=IssueSeverity.WARNING,
path=deprecated,
message=f"Using deprecated extension '{deprecated}'",
suggestion=f"Migrate to 'x-markitect-sections' with classification system"
))
# Analyze properties for rigidity
if "properties" in schema:
self._analyze_properties(schema["properties"], result, "properties")
# Check for missing classifications
if not result.has_classifications:
result.issues.append(SchemaIssue(
issue_type=IssueType.MISSING_CLASSIFICATIONS,
severity=IssueSeverity.INFO,
path="root",
message="Schema does not use section classification system",
suggestion="Add 'x-markitect-sections' to classify sections as required/recommended/optional/discouraged/improper"
))
# Check for missing content control
if not result.has_content_control:
result.issues.append(SchemaIssue(
issue_type=IssueType.MISSING_CONTENT_INSTRUCTIONS,
severity=IssueSeverity.INFO,
path="root",
message="Schema does not provide content control",
suggestion="Add 'x-markitect-content-control' for pattern validation and quality metrics"
))
# Calculate rigidity score
result.rigidity_score = self._calculate_rigidity_score(result)
result.is_rigid = result.rigidity_score > 50
return result
def _analyze_properties(self, properties: Dict[str, Any], result: SchemaAnalysisResult, path: str):
"""Analyze schema properties for rigidity issues."""
for prop_name, prop_def in properties.items():
prop_path = f"{path}.{prop_name}"
if not isinstance(prop_def, dict):
continue
# Check for exact counts (const)
if "const" in prop_def:
result.issues.append(SchemaIssue(
issue_type=IssueType.EXACT_COUNT,
severity=IssueSeverity.WARNING,
path=prop_path,
message=f"Property '{prop_name}' requires exact value",
suggestion=f"Consider using a range or removing constraint for flexibility",
current_value=prop_def["const"]
))
# Check for arrays with exact counts
if prop_def.get("type") == "array":
min_items = prop_def.get("minItems")
max_items = prop_def.get("maxItems")
if min_items is not None and max_items is not None and min_items == max_items:
result.issues.append(SchemaIssue(
issue_type=IssueType.EXACT_COUNT,
severity=IssueSeverity.WARNING,
path=prop_path,
message=f"Array '{prop_name}' requires exactly {min_items} items",
suggestion=f"Use a range like minItems: {max(0, min_items - 2)}, maxItems: {min_items + 5}",
current_value={"minItems": min_items, "maxItems": max_items},
suggested_value={
"minItems": max(0, min_items - 2),
"maxItems": min_items + 5
}
))
# Check for overly specific counts (large numbers)
if min_items is not None and min_items > 50:
result.issues.append(SchemaIssue(
issue_type=IssueType.OVERLY_SPECIFIC,
severity=IssueSeverity.INFO,
path=prop_path,
message=f"Array '{prop_name}' has very specific minItems: {min_items}",
suggestion=f"Consider rounding to {(min_items // 10) * 10} for flexibility",
current_value=min_items,
suggested_value=(min_items // 10) * 10
))
# Check for overly specific integer constraints
if prop_def.get("type") == "integer":
if "minimum" in prop_def and "maximum" in prop_def:
min_val = prop_def["minimum"]
max_val = prop_def["maximum"]
range_size = max_val - min_val
if range_size < 3:
result.issues.append(SchemaIssue(
issue_type=IssueType.NO_FLEXIBILITY,
severity=IssueSeverity.INFO,
path=prop_path,
message=f"Integer '{prop_name}' has very narrow range: {min_val}-{max_val}",
suggestion=f"Consider widening range for flexibility",
current_value={"minimum": min_val, "maximum": max_val}
))
# Recursively check nested properties
if "properties" in prop_def:
self._analyze_properties(prop_def["properties"], result, prop_path)
# Check items schema for arrays
if "items" in prop_def and isinstance(prop_def["items"], dict):
if "properties" in prop_def["items"]:
self._analyze_properties(
prop_def["items"]["properties"],
result,
f"{prop_path}.items"
)
def _calculate_rigidity_score(self, result: SchemaAnalysisResult) -> int:
"""
Calculate overall rigidity score (0-100).
Higher score = more rigid schema.
"""
score = 0
# Count issues by type with weighted scores
weights = {
IssueType.EXACT_COUNT: 15,
IssueType.OVERLY_SPECIFIC: 10,
IssueType.NO_FLEXIBILITY: 8,
IssueType.MISSING_CLASSIFICATIONS: 5,
IssueType.MISSING_CONTENT_INSTRUCTIONS: 3,
IssueType.DEPRECATED_EXTENSIONS: 5
}
for issue in result.issues:
score += weights.get(issue.issue_type, 5)
# Cap at 100
return min(100, score)
def analyze_schema_file(self, schema_path: Path) -> SchemaAnalysisResult:
"""
Analyze a schema file.
Args:
schema_path: Path to JSON schema file
Returns:
SchemaAnalysisResult
"""
with open(schema_path) as f:
schema = json.load(f)
return self.analyze_schema(schema)
def format_analysis_report(self, result: SchemaAnalysisResult, verbose: bool = False) -> str:
"""
Format analysis results as a human-readable report.
Args:
result: Analysis results
verbose: Include detailed information
Returns:
Formatted report string
"""
lines = []
# Header
lines.append("=" * 70)
lines.append("Schema Analysis Report")
lines.append("=" * 70)
lines.append("")
# Overall assessment
rigidity_level = "HIGH" if result.rigidity_score > 70 else "MEDIUM" if result.rigidity_score > 40 else "LOW"
lines.append(f"Rigidity Score: {result.rigidity_score}/100 ({rigidity_level})")
lines.append(f"Status: {'RIGID - Needs refinement' if result.is_rigid else 'FLEXIBLE - Good'}")
lines.append("")
# Features check
lines.append("Phase 1 Features:")
lines.append(f" ✓ Classifications: {'Yes' if result.has_classifications else 'No'}")
lines.append(f" ✓ Content Control: {'Yes' if result.has_content_control else 'No'}")
if result.uses_deprecated_extensions:
lines.append(f" ⚠ Deprecated Extensions: Yes (needs migration)")
lines.append("")
# Issue summary
counts = result.issue_count_by_severity
lines.append(f"Issues Found: {len(result.issues)} total")
lines.append(f" - Errors: {counts[IssueSeverity.ERROR]}")
lines.append(f" - Warnings: {counts[IssueSeverity.WARNING]}")
lines.append(f" - Info: {counts[IssueSeverity.INFO]}")
lines.append("")
# List issues
if result.issues:
lines.append("Detected Issues:")
lines.append("-" * 70)
for i, issue in enumerate(result.issues, 1):
severity_icon = "" if issue.severity == IssueSeverity.ERROR else "⚠️ " if issue.severity == IssueSeverity.WARNING else " "
lines.append(f"{i}. {severity_icon} {issue.message}")
lines.append(f" Path: {issue.path}")
lines.append(f" Suggestion: {issue.suggestion}")
if verbose and issue.current_value is not None:
lines.append(f" Current: {json.dumps(issue.current_value)}")
if verbose and issue.suggested_value is not None:
lines.append(f" Suggested: {json.dumps(issue.suggested_value)}")
lines.append("")
else:
lines.append("✅ No issues found - schema is well-designed!")
lines.append("")
# Recommendations
if result.is_rigid:
lines.append("Recommendations:")
lines.append("-" * 70)
lines.append("Run: markitect schema-refine <schema-file> --loosen-counts")
lines.append(" to automatically apply suggested improvements")
lines.append("")
return "\n".join(lines)
def analyze_schema_cli(schema_path: str, verbose: bool = False) -> int:
"""
CLI entry point for schema analysis.
Args:
schema_path: Path to schema file
verbose: Show detailed information
Returns:
Exit code (0 = success, 1 = rigid schema found)
"""
analyzer = SchemaAnalyzer()
try:
result = analyzer.analyze_schema_file(Path(schema_path))
report = analyzer.format_analysis_report(result, verbose=verbose)
print(report)
return 1 if result.is_rigid else 0
except FileNotFoundError:
print(f"Error: Schema file not found: {schema_path}")
return 2
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in schema file: {e}")
return 2
except Exception as e:
print(f"Error: {e}")
return 2