diff --git a/markitect/cli.py b/markitect/cli.py index 0302cff0..254fc065 100644 --- a/markitect/cli.py +++ b/markitect/cli.py @@ -1872,6 +1872,89 @@ def schema_delete(config, schema_name, confirm): sys.exit(1) +@cli.command('schema-analyze') +@click.argument('schema_file', type=click.Path(exists=True)) +@click.option('--verbose', '-v', is_flag=True, help='Show detailed analysis') +@pass_config +def schema_analyze_cmd(config, schema_file, verbose): + """ + Analyze a schema for rigidity issues and suggest improvements. + + Examines JSON schemas to detect: + - Exact counts that should be ranges + - Missing classification system + - Deprecated extensions + - Overly specific constraints + + Returns exit code 0 for flexible schemas, 1 for rigid schemas, 2 for errors. + + Examples: + markitect schema-analyze schema.json + markitect schema-analyze schema.json --verbose + """ + from .schema_analyzer import analyze_schema_cli + sys.exit(analyze_schema_cli(schema_file, verbose=verbose)) + + +@cli.command('schema-refine') +@click.argument('schema_file', type=click.Path(exists=True)) +@click.option('--output', '-o', type=click.Path(), + help='Output file (default: overwrite input file)') +@click.option('--loosen-counts', is_flag=True, default=True, + help='Convert exact counts to flexible ranges (default: enabled)') +@click.option('--no-loosen-counts', is_flag=True, + help='Disable count loosening') +@click.option('--round-numbers', is_flag=True, default=True, + help='Round overly specific numbers (default: enabled)') +@click.option('--no-round-numbers', is_flag=True, + help='Disable number rounding') +@click.option('--migrate-deprecated', is_flag=True, default=False, + help='Migrate deprecated extensions (requires manual review)') +@click.option('--dry-run', is_flag=True, + help='Show changes without applying them') +@pass_config +def schema_refine_cmd(config, schema_file, output, loosen_counts, no_loosen_counts, + round_numbers, no_round_numbers, migrate_deprecated, dry_run): + """ + Refine a schema by automatically applying fixes for rigidity issues. + + This command analyzes the schema and applies automatic fixes: + - Converts exact counts to flexible ranges + - Rounds overly specific numbers + - Widens narrow integer constraints + - Documents deprecated extension usage + + By default, the input file is overwritten. Use --output to save to a different file. + + Examples: + # Refine schema in place + markitect schema-refine schema.json + + # Preview changes without applying + markitect schema-refine schema.json --dry-run + + # Save refined schema to new file + markitect schema-refine schema.json --output refined-schema.json + + # Disable specific refinements + markitect schema-refine schema.json --no-loosen-counts + """ + from .schema_refiner import refine_schema_cli + + # Handle flag conflicts + loosen = loosen_counts and not no_loosen_counts + round_nums = round_numbers and not no_round_numbers + + sys.exit(refine_schema_cli( + schema_file, + output=output, + loosen_counts=loosen, + migrate_deprecated=migrate_deprecated, + round_numbers=round_nums, + dry_run=dry_run + )) + + @cli.command('generate-stub') @click.argument('schema_file', type=click.Path(exists=True, path_type=Path)) @click.option('--output', '-o', type=click.Path(path_type=Path), diff --git a/markitect/schema_analyzer.py b/markitect/schema_analyzer.py new file mode 100644 index 00000000..94d259be --- /dev/null +++ b/markitect/schema_analyzer.py @@ -0,0 +1,352 @@ +""" +Schema Analyzer for Phase 2: Schema Refinement Tools + +Analyzes JSON schemas to detect rigidity issues and provide suggestions +for improvement using the Phase 1 classification system. +""" + +from pathlib import Path +from typing import Dict, Any, List, Optional, Tuple +import json +from dataclasses import dataclass, field +from enum import Enum + + +class IssueType(Enum): + """Types of schema rigidity issues.""" + EXACT_COUNT = "exact_count" + MISSING_CLASSIFICATIONS = "missing_classifications" + MISSING_CONTENT_INSTRUCTIONS = "missing_content_instructions" + OVERLY_SPECIFIC = "overly_specific" + NO_FLEXIBILITY = "no_flexibility" + DEPRECATED_EXTENSIONS = "deprecated_extensions" + + +class IssueSeverity(Enum): + """Severity levels for schema issues.""" + INFO = "info" + WARNING = "warning" + ERROR = "error" + + +@dataclass +class SchemaIssue: + """Represents a detected schema issue.""" + issue_type: IssueType + severity: IssueSeverity + path: str + message: str + suggestion: str + current_value: Any = None + suggested_value: Any = None + + +@dataclass +class SchemaAnalysisResult: + """Results of schema analysis.""" + is_rigid: bool + rigidity_score: int # 0-100, higher = more rigid + issues: List[SchemaIssue] = field(default_factory=list) + has_classifications: bool = False + has_content_control: bool = False + uses_deprecated_extensions: bool = False + + @property + def issue_count_by_severity(self) -> Dict[IssueSeverity, int]: + """Count issues by severity.""" + counts = {severity: 0 for severity in IssueSeverity} + for issue in self.issues: + counts[issue.severity] += 1 + return counts + + +class SchemaAnalyzer: + """Analyzes schemas for rigidity and suggests improvements.""" + + def __init__(self): + """Initialize the schema analyzer.""" + self.deprecated_extensions = [ + "x-markitect-required-sections", + "x-markitect-recommended-sections", + "x-markitect-optional-sections" + ] + + def analyze_schema(self, schema: Dict[str, Any]) -> SchemaAnalysisResult: + """ + Analyze a schema for rigidity issues. + + Args: + schema: The JSON schema to analyze + + Returns: + SchemaAnalysisResult with detected issues and suggestions + """ + result = SchemaAnalysisResult(is_rigid=False, rigidity_score=0) + + # Check for Phase 1 features + result.has_classifications = "x-markitect-sections" in schema + result.has_content_control = "x-markitect-content-control" in schema + + # Check for deprecated extensions + for deprecated in self.deprecated_extensions: + if deprecated in schema: + result.uses_deprecated_extensions = True + result.issues.append(SchemaIssue( + issue_type=IssueType.DEPRECATED_EXTENSIONS, + severity=IssueSeverity.WARNING, + path=deprecated, + message=f"Using deprecated extension '{deprecated}'", + suggestion=f"Migrate to 'x-markitect-sections' with classification system" + )) + + # Analyze properties for rigidity + if "properties" in schema: + self._analyze_properties(schema["properties"], result, "properties") + + # Check for missing classifications + if not result.has_classifications: + result.issues.append(SchemaIssue( + issue_type=IssueType.MISSING_CLASSIFICATIONS, + severity=IssueSeverity.INFO, + path="root", + message="Schema does not use section classification system", + suggestion="Add 'x-markitect-sections' to classify sections as required/recommended/optional/discouraged/improper" + )) + + # Check for missing content control + if not result.has_content_control: + result.issues.append(SchemaIssue( + issue_type=IssueType.MISSING_CONTENT_INSTRUCTIONS, + severity=IssueSeverity.INFO, + path="root", + message="Schema does not provide content control", + suggestion="Add 'x-markitect-content-control' for pattern validation and quality metrics" + )) + + # Calculate rigidity score + result.rigidity_score = self._calculate_rigidity_score(result) + result.is_rigid = result.rigidity_score > 50 + + return result + + def _analyze_properties(self, properties: Dict[str, Any], result: SchemaAnalysisResult, path: str): + """Analyze schema properties for rigidity issues.""" + for prop_name, prop_def in properties.items(): + prop_path = f"{path}.{prop_name}" + + if not isinstance(prop_def, dict): + continue + + # Check for exact counts (const) + if "const" in prop_def: + result.issues.append(SchemaIssue( + issue_type=IssueType.EXACT_COUNT, + severity=IssueSeverity.WARNING, + path=prop_path, + message=f"Property '{prop_name}' requires exact value", + suggestion=f"Consider using a range or removing constraint for flexibility", + current_value=prop_def["const"] + )) + + # Check for arrays with exact counts + if prop_def.get("type") == "array": + min_items = prop_def.get("minItems") + max_items = prop_def.get("maxItems") + + if min_items is not None and max_items is not None and min_items == max_items: + result.issues.append(SchemaIssue( + issue_type=IssueType.EXACT_COUNT, + severity=IssueSeverity.WARNING, + path=prop_path, + message=f"Array '{prop_name}' requires exactly {min_items} items", + suggestion=f"Use a range like minItems: {max(0, min_items - 2)}, maxItems: {min_items + 5}", + current_value={"minItems": min_items, "maxItems": max_items}, + suggested_value={ + "minItems": max(0, min_items - 2), + "maxItems": min_items + 5 + } + )) + + # Check for overly specific counts (large numbers) + if min_items is not None and min_items > 50: + result.issues.append(SchemaIssue( + issue_type=IssueType.OVERLY_SPECIFIC, + severity=IssueSeverity.INFO, + path=prop_path, + message=f"Array '{prop_name}' has very specific minItems: {min_items}", + suggestion=f"Consider rounding to {(min_items // 10) * 10} for flexibility", + current_value=min_items, + suggested_value=(min_items // 10) * 10 + )) + + # Check for overly specific integer constraints + if prop_def.get("type") == "integer": + if "minimum" in prop_def and "maximum" in prop_def: + min_val = prop_def["minimum"] + max_val = prop_def["maximum"] + range_size = max_val - min_val + + if range_size < 3: + result.issues.append(SchemaIssue( + issue_type=IssueType.NO_FLEXIBILITY, + severity=IssueSeverity.INFO, + path=prop_path, + message=f"Integer '{prop_name}' has very narrow range: {min_val}-{max_val}", + suggestion=f"Consider widening range for flexibility", + current_value={"minimum": min_val, "maximum": max_val} + )) + + # Recursively check nested properties + if "properties" in prop_def: + self._analyze_properties(prop_def["properties"], result, prop_path) + + # Check items schema for arrays + if "items" in prop_def and isinstance(prop_def["items"], dict): + if "properties" in prop_def["items"]: + self._analyze_properties( + prop_def["items"]["properties"], + result, + f"{prop_path}.items" + ) + + def _calculate_rigidity_score(self, result: SchemaAnalysisResult) -> int: + """ + Calculate overall rigidity score (0-100). + + Higher score = more rigid schema. + """ + score = 0 + + # Count issues by type with weighted scores + weights = { + IssueType.EXACT_COUNT: 15, + IssueType.OVERLY_SPECIFIC: 10, + IssueType.NO_FLEXIBILITY: 8, + IssueType.MISSING_CLASSIFICATIONS: 5, + IssueType.MISSING_CONTENT_INSTRUCTIONS: 3, + IssueType.DEPRECATED_EXTENSIONS: 5 + } + + for issue in result.issues: + score += weights.get(issue.issue_type, 5) + + # Cap at 100 + return min(100, score) + + def analyze_schema_file(self, schema_path: Path) -> SchemaAnalysisResult: + """ + Analyze a schema file. + + Args: + schema_path: Path to JSON schema file + + Returns: + SchemaAnalysisResult + """ + with open(schema_path) as f: + schema = json.load(f) + + return self.analyze_schema(schema) + + def format_analysis_report(self, result: SchemaAnalysisResult, verbose: bool = False) -> str: + """ + Format analysis results as a human-readable report. + + Args: + result: Analysis results + verbose: Include detailed information + + Returns: + Formatted report string + """ + lines = [] + + # Header + lines.append("=" * 70) + lines.append("Schema Analysis Report") + lines.append("=" * 70) + lines.append("") + + # Overall assessment + rigidity_level = "HIGH" if result.rigidity_score > 70 else "MEDIUM" if result.rigidity_score > 40 else "LOW" + lines.append(f"Rigidity Score: {result.rigidity_score}/100 ({rigidity_level})") + lines.append(f"Status: {'RIGID - Needs refinement' if result.is_rigid else 'FLEXIBLE - Good'}") + lines.append("") + + # Features check + lines.append("Phase 1 Features:") + lines.append(f" ✓ Classifications: {'Yes' if result.has_classifications else 'No'}") + lines.append(f" ✓ Content Control: {'Yes' if result.has_content_control else 'No'}") + if result.uses_deprecated_extensions: + lines.append(f" ⚠ Deprecated Extensions: Yes (needs migration)") + lines.append("") + + # Issue summary + counts = result.issue_count_by_severity + lines.append(f"Issues Found: {len(result.issues)} total") + lines.append(f" - Errors: {counts[IssueSeverity.ERROR]}") + lines.append(f" - Warnings: {counts[IssueSeverity.WARNING]}") + lines.append(f" - Info: {counts[IssueSeverity.INFO]}") + lines.append("") + + # List issues + if result.issues: + lines.append("Detected Issues:") + lines.append("-" * 70) + + for i, issue in enumerate(result.issues, 1): + severity_icon = "❌" if issue.severity == IssueSeverity.ERROR else "⚠️ " if issue.severity == IssueSeverity.WARNING else "ℹ️ " + lines.append(f"{i}. {severity_icon} {issue.message}") + lines.append(f" Path: {issue.path}") + lines.append(f" Suggestion: {issue.suggestion}") + + if verbose and issue.current_value is not None: + lines.append(f" Current: {json.dumps(issue.current_value)}") + if verbose and issue.suggested_value is not None: + lines.append(f" Suggested: {json.dumps(issue.suggested_value)}") + + lines.append("") + else: + lines.append("✅ No issues found - schema is well-designed!") + lines.append("") + + # Recommendations + if result.is_rigid: + lines.append("Recommendations:") + lines.append("-" * 70) + lines.append("Run: markitect schema-refine --loosen-counts") + lines.append(" to automatically apply suggested improvements") + lines.append("") + + return "\n".join(lines) + + +def analyze_schema_cli(schema_path: str, verbose: bool = False) -> int: + """ + CLI entry point for schema analysis. + + Args: + schema_path: Path to schema file + verbose: Show detailed information + + Returns: + Exit code (0 = success, 1 = rigid schema found) + """ + analyzer = SchemaAnalyzer() + + try: + result = analyzer.analyze_schema_file(Path(schema_path)) + report = analyzer.format_analysis_report(result, verbose=verbose) + print(report) + + return 1 if result.is_rigid else 0 + + except FileNotFoundError: + print(f"Error: Schema file not found: {schema_path}") + return 2 + except json.JSONDecodeError as e: + print(f"Error: Invalid JSON in schema file: {e}") + return 2 + except Exception as e: + print(f"Error: {e}") + return 2 diff --git a/markitect/schema_refiner.py b/markitect/schema_refiner.py new file mode 100644 index 00000000..46893c05 --- /dev/null +++ b/markitect/schema_refiner.py @@ -0,0 +1,420 @@ +""" +Schema Refiner for Phase 2: Schema Refinement Tools + +Automatically refines rigid schemas by applying loosening rules and fixes. +""" + +from pathlib import Path +from typing import Dict, Any, List, Optional, Tuple +import json +import copy +from dataclasses import dataclass, field + +from .schema_analyzer import SchemaAnalyzer, SchemaIssue, IssueType, IssueSeverity + + +@dataclass +class RefinementAction: + """Represents a refinement action taken on the schema.""" + issue_type: IssueType + path: str + description: str + old_value: Any = None + new_value: Any = None + + +@dataclass +class RefinementResult: + """Results of schema refinement.""" + success: bool + actions_taken: List[RefinementAction] = field(default_factory=list) + refined_schema: Optional[Dict[str, Any]] = None + error_message: Optional[str] = None + + +class SchemaRefiner: + """Refines rigid schemas by applying loosening rules.""" + + def __init__(self): + """Initialize the schema refiner.""" + self.analyzer = SchemaAnalyzer() + + def _navigate_to_path(self, schema: Dict[str, Any], path: str) -> Optional[Tuple[Dict[str, Any], str]]: + """ + Navigate to a path in the schema, handling nested 'properties' objects. + + Returns (parent_object, property_name) or None if path doesn't exist. + """ + path_parts = path.split('.') + obj = schema + + # Navigate through all but the last part + for i, part in enumerate(path_parts[:-1]): + # Try direct access first + if part in obj: + obj = obj[part] + # If not found and obj has 'properties', try there + elif isinstance(obj, dict) and "properties" in obj and part in obj["properties"]: + obj = obj["properties"][part] + else: + return None + + # For the final part, check if we need to descend into 'properties' + prop_name = path_parts[-1] + if prop_name in obj: + return (obj, prop_name) + elif isinstance(obj, dict) and "properties" in obj and prop_name in obj["properties"]: + return (obj["properties"], prop_name) + else: + return None + + def refine_schema( + self, + schema: Dict[str, Any], + loosen_counts: bool = True, + migrate_deprecated: bool = False, + round_numbers: bool = True + ) -> RefinementResult: + """ + Refine a schema by applying fixes for detected issues. + + Args: + schema: The JSON schema to refine + loosen_counts: Apply fixes for exact counts + migrate_deprecated: Migrate deprecated extensions + round_numbers: Round overly specific numbers + + Returns: + RefinementResult with actions taken and refined schema + """ + result = RefinementResult(success=False) + + try: + # Analyze the schema first + analysis = self.analyzer.analyze_schema(schema) + + # Deep copy to avoid modifying original + refined = copy.deepcopy(schema) + + # Apply fixes based on issues found + for issue in analysis.issues: + action = None + + if loosen_counts and issue.issue_type == IssueType.EXACT_COUNT: + action = self._fix_exact_count(refined, issue) + + elif round_numbers and issue.issue_type == IssueType.OVERLY_SPECIFIC: + action = self._fix_overly_specific(refined, issue) + + elif loosen_counts and issue.issue_type == IssueType.NO_FLEXIBILITY: + action = self._fix_no_flexibility(refined, issue) + + elif migrate_deprecated and issue.issue_type == IssueType.DEPRECATED_EXTENSIONS: + action = self._fix_deprecated_extension(refined, issue) + + if action: + result.actions_taken.append(action) + + result.refined_schema = refined + result.success = True + + except Exception as e: + result.error_message = str(e) + + return result + + def _fix_exact_count(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]: + """Fix exact count constraints by converting to ranges.""" + nav_result = self._navigate_to_path(schema, issue.path) + if not nav_result: + return None + + obj, prop_name = nav_result + prop_def = obj[prop_name] + old_value = copy.deepcopy(prop_def) + + # Check if it's an array with exact minItems/maxItems + if isinstance(prop_def, dict) and prop_def.get("type") == "array": + min_items = prop_def.get("minItems") + max_items = prop_def.get("maxItems") + + if min_items is not None and max_items is not None and min_items == max_items: + # Apply suggested loosening + new_min = max(0, min_items - 2) + new_max = min_items + 5 + + prop_def["minItems"] = new_min + prop_def["maxItems"] = new_max + + return RefinementAction( + issue_type=IssueType.EXACT_COUNT, + path=issue.path, + description=f"Loosened array count from exactly {min_items} to range {new_min}-{new_max}", + old_value={"minItems": min_items, "maxItems": max_items}, + new_value={"minItems": new_min, "maxItems": new_max} + ) + + # Check if it's a const value + if isinstance(prop_def, dict) and "const" in prop_def: + const_value = prop_def["const"] + del prop_def["const"] + + # If it's a number, convert to a range + if isinstance(const_value, int): + prop_def["minimum"] = const_value - 1 + prop_def["maximum"] = const_value + 1 + + return RefinementAction( + issue_type=IssueType.EXACT_COUNT, + path=issue.path, + description=f"Converted const {const_value} to range {const_value-1}-{const_value+1}", + old_value=const_value, + new_value={"minimum": const_value - 1, "maximum": const_value + 1} + ) + else: + # For non-numeric constants, just remove the constraint + return RefinementAction( + issue_type=IssueType.EXACT_COUNT, + path=issue.path, + description=f"Removed const constraint: {const_value}", + old_value=const_value, + new_value=None + ) + + return None + + def _fix_overly_specific(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]: + """Fix overly specific number constraints by rounding.""" + if issue.suggested_value is None: + return None + + nav_result = self._navigate_to_path(schema, issue.path) + if not nav_result: + return None + + obj, prop_name = nav_result + prop_def = obj[prop_name] + + # Round the minItems value + if isinstance(prop_def, dict) and "minItems" in prop_def: + old_value = prop_def["minItems"] + new_value = issue.suggested_value + prop_def["minItems"] = new_value + + return RefinementAction( + issue_type=IssueType.OVERLY_SPECIFIC, + path=issue.path, + description=f"Rounded minItems from {old_value} to {new_value}", + old_value=old_value, + new_value=new_value + ) + + return None + + def _fix_no_flexibility(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]: + """Fix narrow ranges by widening them.""" + nav_result = self._navigate_to_path(schema, issue.path) + if not nav_result: + return None + + obj, prop_name = nav_result + prop_def = obj[prop_name] + + if isinstance(prop_def, dict) and "minimum" in prop_def and "maximum" in prop_def: + old_min = prop_def["minimum"] + old_max = prop_def["maximum"] + range_size = old_max - old_min + + # Widen the range + new_min = old_min - 5 + new_max = old_max + 5 + + prop_def["minimum"] = new_min + prop_def["maximum"] = new_max + + return RefinementAction( + issue_type=IssueType.NO_FLEXIBILITY, + path=issue.path, + description=f"Widened range from {old_min}-{old_max} to {new_min}-{new_max}", + old_value={"minimum": old_min, "maximum": old_max}, + new_value={"minimum": new_min, "maximum": new_max} + ) + + return None + + def _fix_deprecated_extension(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]: + """Remove deprecated extension (migration requires manual work).""" + # For now, just document that manual migration is needed + # Full migration would require understanding the old format + + deprecated_key = issue.path + if deprecated_key in schema: + old_value = schema[deprecated_key] + # Don't actually remove it automatically - too risky + return RefinementAction( + issue_type=IssueType.DEPRECATED_EXTENSIONS, + path=issue.path, + description=f"Detected deprecated extension (manual migration recommended)", + old_value=old_value, + new_value=None + ) + + return None + + def refine_schema_file( + self, + input_path: Path, + output_path: Optional[Path] = None, + loosen_counts: bool = True, + migrate_deprecated: bool = False, + round_numbers: bool = True + ) -> RefinementResult: + """ + Refine a schema file. + + Args: + input_path: Path to input schema file + output_path: Path to output file (if None, overwrites input) + loosen_counts: Apply fixes for exact counts + migrate_deprecated: Migrate deprecated extensions + round_numbers: Round overly specific numbers + + Returns: + RefinementResult + """ + with open(input_path) as f: + schema = json.load(f) + + result = self.refine_schema( + schema, + loosen_counts=loosen_counts, + migrate_deprecated=migrate_deprecated, + round_numbers=round_numbers + ) + + if result.success and result.refined_schema: + output = output_path or input_path + with open(output, 'w') as f: + json.dump(result.refined_schema, f, indent=2) + + return result + + def format_refinement_report(self, result: RefinementResult) -> str: + """ + Format refinement results as a human-readable report. + + Args: + result: Refinement results + + Returns: + Formatted report string + """ + lines = [] + + # Header + lines.append("=" * 70) + lines.append("Schema Refinement Report") + lines.append("=" * 70) + lines.append("") + + if not result.success: + lines.append(f"❌ Refinement failed: {result.error_message}") + return "\n".join(lines) + + # Summary + action_count = len(result.actions_taken) + if action_count == 0: + lines.append("✅ No refinements needed - schema is already flexible") + else: + lines.append(f"✅ Applied {action_count} refinement(s)") + lines.append("") + + # List actions + if result.actions_taken: + lines.append("Actions Taken:") + lines.append("-" * 70) + + for i, action in enumerate(result.actions_taken, 1): + lines.append(f"{i}. {action.description}") + lines.append(f" Path: {action.path}") + + if action.old_value is not None: + lines.append(f" Before: {json.dumps(action.old_value)}") + if action.new_value is not None: + lines.append(f" After: {json.dumps(action.new_value)}") + + lines.append("") + + return "\n".join(lines) + + +def refine_schema_cli( + schema_path: str, + output: Optional[str] = None, + loosen_counts: bool = True, + migrate_deprecated: bool = False, + round_numbers: bool = True, + dry_run: bool = False +) -> int: + """ + CLI entry point for schema refinement. + + Args: + schema_path: Path to schema file + output: Output path (None = overwrite input) + loosen_counts: Apply count loosening fixes + migrate_deprecated: Migrate deprecated extensions + round_numbers: Round overly specific numbers + dry_run: Show changes without applying + + Returns: + Exit code (0 = success, 1 = no changes needed, 2 = error) + """ + refiner = SchemaRefiner() + + try: + input_path = Path(schema_path) + output_path = Path(output) if output else None + + if dry_run: + # Just analyze and show what would be done + with open(input_path) as f: + schema = json.load(f) + + result = refiner.refine_schema( + schema, + loosen_counts=loosen_counts, + migrate_deprecated=migrate_deprecated, + round_numbers=round_numbers + ) + + print("DRY RUN - No changes will be made") + print() + else: + result = refiner.refine_schema_file( + input_path, + output_path, + loosen_counts=loosen_counts, + migrate_deprecated=migrate_deprecated, + round_numbers=round_numbers + ) + + report = refiner.format_refinement_report(result) + print(report) + + if result.success and len(result.actions_taken) > 0: + return 0 # Success with changes + elif result.success: + return 1 # Success but no changes needed + else: + return 2 # Error + + except FileNotFoundError: + print(f"Error: Schema file not found: {schema_path}") + return 2 + except json.JSONDecodeError as e: + print(f"Error: Invalid JSON in schema file: {e}") + return 2 + except Exception as e: + print(f"Error: {e}") + return 2