""" Schema Refiner for Phase 2: Schema Refinement Tools Automatically refines rigid schemas by applying loosening rules and fixes. """ from pathlib import Path from typing import Dict, Any, List, Optional, Tuple import json import copy from dataclasses import dataclass, field from .schema_analyzer import SchemaAnalyzer, SchemaIssue, IssueType, IssueSeverity @dataclass class RefinementAction: """Represents a refinement action taken on the schema.""" issue_type: IssueType path: str description: str old_value: Any = None new_value: Any = None @dataclass class RefinementResult: """Results of schema refinement.""" success: bool actions_taken: List[RefinementAction] = field(default_factory=list) refined_schema: Optional[Dict[str, Any]] = None error_message: Optional[str] = None class SchemaRefiner: """Refines rigid schemas by applying loosening rules.""" def __init__(self): """Initialize the schema refiner.""" self.analyzer = SchemaAnalyzer() def _navigate_to_path(self, schema: Dict[str, Any], path: str) -> Optional[Tuple[Dict[str, Any], str]]: """ Navigate to a path in the schema, handling nested 'properties' objects. Returns (parent_object, property_name) or None if path doesn't exist. """ path_parts = path.split('.') obj = schema # Navigate through all but the last part for i, part in enumerate(path_parts[:-1]): # Try direct access first if part in obj: obj = obj[part] # If not found and obj has 'properties', try there elif isinstance(obj, dict) and "properties" in obj and part in obj["properties"]: obj = obj["properties"][part] else: return None # For the final part, check if we need to descend into 'properties' prop_name = path_parts[-1] if prop_name in obj: return (obj, prop_name) elif isinstance(obj, dict) and "properties" in obj and prop_name in obj["properties"]: return (obj["properties"], prop_name) else: return None def refine_schema( self, schema: Dict[str, Any], loosen_counts: bool = True, migrate_deprecated: bool = False, round_numbers: bool = True ) -> RefinementResult: """ Refine a schema by applying fixes for detected issues. Args: schema: The JSON schema to refine loosen_counts: Apply fixes for exact counts migrate_deprecated: Migrate deprecated extensions round_numbers: Round overly specific numbers Returns: RefinementResult with actions taken and refined schema """ result = RefinementResult(success=False) try: # Analyze the schema first analysis = self.analyzer.analyze_schema(schema) # Deep copy to avoid modifying original refined = copy.deepcopy(schema) # Apply fixes based on issues found for issue in analysis.issues: action = None if loosen_counts and issue.issue_type == IssueType.EXACT_COUNT: action = self._fix_exact_count(refined, issue) elif round_numbers and issue.issue_type == IssueType.OVERLY_SPECIFIC: action = self._fix_overly_specific(refined, issue) elif loosen_counts and issue.issue_type == IssueType.NO_FLEXIBILITY: action = self._fix_no_flexibility(refined, issue) elif migrate_deprecated and issue.issue_type == IssueType.DEPRECATED_EXTENSIONS: action = self._fix_deprecated_extension(refined, issue) if action: result.actions_taken.append(action) result.refined_schema = refined result.success = True except Exception as e: result.error_message = str(e) return result def _fix_exact_count(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]: """Fix exact count constraints by converting to ranges.""" nav_result = self._navigate_to_path(schema, issue.path) if not nav_result: return None obj, prop_name = nav_result prop_def = obj[prop_name] old_value = copy.deepcopy(prop_def) # Check if it's an array with exact minItems/maxItems if isinstance(prop_def, dict) and prop_def.get("type") == "array": min_items = prop_def.get("minItems") max_items = prop_def.get("maxItems") if min_items is not None and max_items is not None and min_items == max_items: # Apply suggested loosening new_min = max(0, min_items - 2) new_max = min_items + 5 prop_def["minItems"] = new_min prop_def["maxItems"] = new_max return RefinementAction( issue_type=IssueType.EXACT_COUNT, path=issue.path, description=f"Loosened array count from exactly {min_items} to range {new_min}-{new_max}", old_value={"minItems": min_items, "maxItems": max_items}, new_value={"minItems": new_min, "maxItems": new_max} ) # Check if it's a const value if isinstance(prop_def, dict) and "const" in prop_def: const_value = prop_def["const"] del prop_def["const"] # If it's a number, convert to a range if isinstance(const_value, int): prop_def["minimum"] = const_value - 1 prop_def["maximum"] = const_value + 1 return RefinementAction( issue_type=IssueType.EXACT_COUNT, path=issue.path, description=f"Converted const {const_value} to range {const_value-1}-{const_value+1}", old_value=const_value, new_value={"minimum": const_value - 1, "maximum": const_value + 1} ) else: # For non-numeric constants, just remove the constraint return RefinementAction( issue_type=IssueType.EXACT_COUNT, path=issue.path, description=f"Removed const constraint: {const_value}", old_value=const_value, new_value=None ) return None def _fix_overly_specific(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]: """Fix overly specific number constraints by rounding.""" if issue.suggested_value is None: return None nav_result = self._navigate_to_path(schema, issue.path) if not nav_result: return None obj, prop_name = nav_result prop_def = obj[prop_name] # Round the minItems value if isinstance(prop_def, dict) and "minItems" in prop_def: old_value = prop_def["minItems"] new_value = issue.suggested_value prop_def["minItems"] = new_value return RefinementAction( issue_type=IssueType.OVERLY_SPECIFIC, path=issue.path, description=f"Rounded minItems from {old_value} to {new_value}", old_value=old_value, new_value=new_value ) return None def _fix_no_flexibility(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]: """Fix narrow ranges by widening them.""" nav_result = self._navigate_to_path(schema, issue.path) if not nav_result: return None obj, prop_name = nav_result prop_def = obj[prop_name] if isinstance(prop_def, dict) and "minimum" in prop_def and "maximum" in prop_def: old_min = prop_def["minimum"] old_max = prop_def["maximum"] range_size = old_max - old_min # Widen the range new_min = old_min - 5 new_max = old_max + 5 prop_def["minimum"] = new_min prop_def["maximum"] = new_max return RefinementAction( issue_type=IssueType.NO_FLEXIBILITY, path=issue.path, description=f"Widened range from {old_min}-{old_max} to {new_min}-{new_max}", old_value={"minimum": old_min, "maximum": old_max}, new_value={"minimum": new_min, "maximum": new_max} ) return None def _fix_deprecated_extension(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]: """Remove deprecated extension (migration requires manual work).""" # For now, just document that manual migration is needed # Full migration would require understanding the old format deprecated_key = issue.path if deprecated_key in schema: old_value = schema[deprecated_key] # Don't actually remove it automatically - too risky return RefinementAction( issue_type=IssueType.DEPRECATED_EXTENSIONS, path=issue.path, description=f"Detected deprecated extension (manual migration recommended)", old_value=old_value, new_value=None ) return None def refine_schema_file( self, input_path: Path, output_path: Optional[Path] = None, loosen_counts: bool = True, migrate_deprecated: bool = False, round_numbers: bool = True ) -> RefinementResult: """ Refine a schema file. Args: input_path: Path to input schema file output_path: Path to output file (if None, overwrites input) loosen_counts: Apply fixes for exact counts migrate_deprecated: Migrate deprecated extensions round_numbers: Round overly specific numbers Returns: RefinementResult """ with open(input_path) as f: schema = json.load(f) result = self.refine_schema( schema, loosen_counts=loosen_counts, migrate_deprecated=migrate_deprecated, round_numbers=round_numbers ) if result.success and result.refined_schema: output = output_path or input_path with open(output, 'w') as f: json.dump(result.refined_schema, f, indent=2) return result def format_refinement_report(self, result: RefinementResult) -> str: """ Format refinement results as a human-readable report. Args: result: Refinement results Returns: Formatted report string """ lines = [] # Header lines.append("=" * 70) lines.append("Schema Refinement Report") lines.append("=" * 70) lines.append("") if not result.success: lines.append(f"❌ Refinement failed: {result.error_message}") return "\n".join(lines) # Summary action_count = len(result.actions_taken) if action_count == 0: lines.append("✅ No refinements needed - schema is already flexible") else: lines.append(f"✅ Applied {action_count} refinement(s)") lines.append("") # List actions if result.actions_taken: lines.append("Actions Taken:") lines.append("-" * 70) for i, action in enumerate(result.actions_taken, 1): lines.append(f"{i}. {action.description}") lines.append(f" Path: {action.path}") if action.old_value is not None: lines.append(f" Before: {json.dumps(action.old_value)}") if action.new_value is not None: lines.append(f" After: {json.dumps(action.new_value)}") lines.append("") return "\n".join(lines) def refine_schema_cli( schema_path: str, output: Optional[str] = None, loosen_counts: bool = True, migrate_deprecated: bool = False, round_numbers: bool = True, dry_run: bool = False ) -> int: """ CLI entry point for schema refinement. Args: schema_path: Path to schema file output: Output path (None = overwrite input) loosen_counts: Apply count loosening fixes migrate_deprecated: Migrate deprecated extensions round_numbers: Round overly specific numbers dry_run: Show changes without applying Returns: Exit code (0 = success, 1 = no changes needed, 2 = error) """ refiner = SchemaRefiner() try: input_path = Path(schema_path) output_path = Path(output) if output else None if dry_run: # Just analyze and show what would be done with open(input_path) as f: schema = json.load(f) result = refiner.refine_schema( schema, loosen_counts=loosen_counts, migrate_deprecated=migrate_deprecated, round_numbers=round_numbers ) print("DRY RUN - No changes will be made") print() else: result = refiner.refine_schema_file( input_path, output_path, loosen_counts=loosen_counts, migrate_deprecated=migrate_deprecated, round_numbers=round_numbers ) report = refiner.format_refinement_report(result) print(report) if result.success and len(result.actions_taken) > 0: return 0 # Success with changes elif result.success: return 1 # Success but no changes needed else: return 2 # Error except FileNotFoundError: print(f"Error: Schema file not found: {schema_path}") return 2 except json.JSONDecodeError as e: print(f"Error: Invalid JSON in schema file: {e}") return 2 except Exception as e: print(f"Error: {e}") return 2