Files
markitect-main/markitect/schema_refiner.py
tegwick 48e0b60be5 feat: add interactive mode to schema-refine command
Added --interactive/-i flag to schema-refine command that allows users to
review and approve each refinement individually:

- Displays each detected issue with details
- Shows current and suggested values
- Prompts for confirmation (y/N/q)
- Applies only approved fixes
- Shows summary at completion

This gives users fine-grained control over which refinements to apply.

Example usage:
  markitect schema-refine schema.json --interactive

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-04 21:30:55 +01:00

531 lines
18 KiB
Python

"""
Schema Refiner for Phase 2: Schema Refinement Tools
Automatically refines rigid schemas by applying loosening rules and fixes.
"""
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
import json
import copy
from dataclasses import dataclass, field
from .schema_analyzer import SchemaAnalyzer, SchemaIssue, IssueType, IssueSeverity
@dataclass
class RefinementAction:
"""Represents a refinement action taken on the schema."""
issue_type: IssueType
path: str
description: str
old_value: Any = None
new_value: Any = None
@dataclass
class RefinementResult:
"""Results of schema refinement."""
success: bool
actions_taken: List[RefinementAction] = field(default_factory=list)
refined_schema: Optional[Dict[str, Any]] = None
error_message: Optional[str] = None
class SchemaRefiner:
"""Refines rigid schemas by applying loosening rules."""
def __init__(self):
"""Initialize the schema refiner."""
self.analyzer = SchemaAnalyzer()
def _navigate_to_path(self, schema: Dict[str, Any], path: str) -> Optional[Tuple[Dict[str, Any], str]]:
"""
Navigate to a path in the schema, handling nested 'properties' objects.
Returns (parent_object, property_name) or None if path doesn't exist.
"""
path_parts = path.split('.')
obj = schema
# Navigate through all but the last part
for i, part in enumerate(path_parts[:-1]):
# Try direct access first
if part in obj:
obj = obj[part]
# If not found and obj has 'properties', try there
elif isinstance(obj, dict) and "properties" in obj and part in obj["properties"]:
obj = obj["properties"][part]
else:
return None
# For the final part, check if we need to descend into 'properties'
prop_name = path_parts[-1]
if prop_name in obj:
return (obj, prop_name)
elif isinstance(obj, dict) and "properties" in obj and prop_name in obj["properties"]:
return (obj["properties"], prop_name)
else:
return None
def refine_schema_interactive(
self,
schema: Dict[str, Any],
loosen_counts: bool = True,
migrate_deprecated: bool = False,
round_numbers: bool = True
) -> RefinementResult:
"""
Refine a schema interactively, prompting for each fix.
Args:
schema: The JSON schema to refine
loosen_counts: Enable fixes for exact counts
migrate_deprecated: Enable migration of deprecated extensions
round_numbers: Enable rounding of overly specific numbers
Returns:
RefinementResult with actions taken and refined schema
"""
result = RefinementResult(success=False)
try:
# Analyze the schema first
analysis = self.analyzer.analyze_schema(schema)
print(f"\nFound {len(analysis.issues)} issue(s) to review\n")
# Deep copy to avoid modifying original
refined = copy.deepcopy(schema)
# Process each issue interactively
for i, issue in enumerate(analysis.issues, 1):
print(f"Issue {i}/{len(analysis.issues)}")
print(f" Type: {issue.issue_type.value}")
print(f" Path: {issue.path}")
print(f" {issue.message}")
print(f" Suggestion: {issue.suggestion}")
if issue.current_value is not None:
print(f" Current: {json.dumps(issue.current_value)}")
if issue.suggested_value is not None:
print(f" Suggested: {json.dumps(issue.suggested_value)}")
# Ask user if they want to apply the fix
response = input("\nApply this fix? [y/N/q]: ").strip().lower()
if response == 'q':
print("Refinement cancelled by user")
result.success = False
return result
elif response == 'y':
action = None
if loosen_counts and issue.issue_type == IssueType.EXACT_COUNT:
action = self._fix_exact_count(refined, issue)
elif round_numbers and issue.issue_type == IssueType.OVERLY_SPECIFIC:
action = self._fix_overly_specific(refined, issue)
elif loosen_counts and issue.issue_type == IssueType.NO_FLEXIBILITY:
action = self._fix_no_flexibility(refined, issue)
elif migrate_deprecated and issue.issue_type == IssueType.DEPRECATED_EXTENSIONS:
action = self._fix_deprecated_extension(refined, issue)
if action:
result.actions_taken.append(action)
print(f" ✓ Applied")
else:
print(f" ✗ Could not apply fix")
else:
print(f" - Skipped")
print()
result.refined_schema = refined
result.success = True
except Exception as e:
result.error_message = str(e)
return result
def refine_schema(
self,
schema: Dict[str, Any],
loosen_counts: bool = True,
migrate_deprecated: bool = False,
round_numbers: bool = True
) -> RefinementResult:
"""
Refine a schema by applying fixes for detected issues.
Args:
schema: The JSON schema to refine
loosen_counts: Apply fixes for exact counts
migrate_deprecated: Migrate deprecated extensions
round_numbers: Round overly specific numbers
Returns:
RefinementResult with actions taken and refined schema
"""
result = RefinementResult(success=False)
try:
# Analyze the schema first
analysis = self.analyzer.analyze_schema(schema)
# Deep copy to avoid modifying original
refined = copy.deepcopy(schema)
# Apply fixes based on issues found
for issue in analysis.issues:
action = None
if loosen_counts and issue.issue_type == IssueType.EXACT_COUNT:
action = self._fix_exact_count(refined, issue)
elif round_numbers and issue.issue_type == IssueType.OVERLY_SPECIFIC:
action = self._fix_overly_specific(refined, issue)
elif loosen_counts and issue.issue_type == IssueType.NO_FLEXIBILITY:
action = self._fix_no_flexibility(refined, issue)
elif migrate_deprecated and issue.issue_type == IssueType.DEPRECATED_EXTENSIONS:
action = self._fix_deprecated_extension(refined, issue)
if action:
result.actions_taken.append(action)
result.refined_schema = refined
result.success = True
except Exception as e:
result.error_message = str(e)
return result
def _fix_exact_count(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
"""Fix exact count constraints by converting to ranges."""
nav_result = self._navigate_to_path(schema, issue.path)
if not nav_result:
return None
obj, prop_name = nav_result
prop_def = obj[prop_name]
old_value = copy.deepcopy(prop_def)
# Check if it's an array with exact minItems/maxItems
if isinstance(prop_def, dict) and prop_def.get("type") == "array":
min_items = prop_def.get("minItems")
max_items = prop_def.get("maxItems")
if min_items is not None and max_items is not None and min_items == max_items:
# Apply suggested loosening
new_min = max(0, min_items - 2)
new_max = min_items + 5
prop_def["minItems"] = new_min
prop_def["maxItems"] = new_max
return RefinementAction(
issue_type=IssueType.EXACT_COUNT,
path=issue.path,
description=f"Loosened array count from exactly {min_items} to range {new_min}-{new_max}",
old_value={"minItems": min_items, "maxItems": max_items},
new_value={"minItems": new_min, "maxItems": new_max}
)
# Check if it's a const value
if isinstance(prop_def, dict) and "const" in prop_def:
const_value = prop_def["const"]
del prop_def["const"]
# If it's a number, convert to a range
if isinstance(const_value, int):
prop_def["minimum"] = const_value - 1
prop_def["maximum"] = const_value + 1
return RefinementAction(
issue_type=IssueType.EXACT_COUNT,
path=issue.path,
description=f"Converted const {const_value} to range {const_value-1}-{const_value+1}",
old_value=const_value,
new_value={"minimum": const_value - 1, "maximum": const_value + 1}
)
else:
# For non-numeric constants, just remove the constraint
return RefinementAction(
issue_type=IssueType.EXACT_COUNT,
path=issue.path,
description=f"Removed const constraint: {const_value}",
old_value=const_value,
new_value=None
)
return None
def _fix_overly_specific(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
"""Fix overly specific number constraints by rounding."""
if issue.suggested_value is None:
return None
nav_result = self._navigate_to_path(schema, issue.path)
if not nav_result:
return None
obj, prop_name = nav_result
prop_def = obj[prop_name]
# Round the minItems value
if isinstance(prop_def, dict) and "minItems" in prop_def:
old_value = prop_def["minItems"]
new_value = issue.suggested_value
prop_def["minItems"] = new_value
return RefinementAction(
issue_type=IssueType.OVERLY_SPECIFIC,
path=issue.path,
description=f"Rounded minItems from {old_value} to {new_value}",
old_value=old_value,
new_value=new_value
)
return None
def _fix_no_flexibility(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
"""Fix narrow ranges by widening them."""
nav_result = self._navigate_to_path(schema, issue.path)
if not nav_result:
return None
obj, prop_name = nav_result
prop_def = obj[prop_name]
if isinstance(prop_def, dict) and "minimum" in prop_def and "maximum" in prop_def:
old_min = prop_def["minimum"]
old_max = prop_def["maximum"]
range_size = old_max - old_min
# Widen the range
new_min = old_min - 5
new_max = old_max + 5
prop_def["minimum"] = new_min
prop_def["maximum"] = new_max
return RefinementAction(
issue_type=IssueType.NO_FLEXIBILITY,
path=issue.path,
description=f"Widened range from {old_min}-{old_max} to {new_min}-{new_max}",
old_value={"minimum": old_min, "maximum": old_max},
new_value={"minimum": new_min, "maximum": new_max}
)
return None
def _fix_deprecated_extension(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
"""Remove deprecated extension (migration requires manual work)."""
# For now, just document that manual migration is needed
# Full migration would require understanding the old format
deprecated_key = issue.path
if deprecated_key in schema:
old_value = schema[deprecated_key]
# Don't actually remove it automatically - too risky
return RefinementAction(
issue_type=IssueType.DEPRECATED_EXTENSIONS,
path=issue.path,
description=f"Detected deprecated extension (manual migration recommended)",
old_value=old_value,
new_value=None
)
return None
def refine_schema_file(
self,
input_path: Path,
output_path: Optional[Path] = None,
loosen_counts: bool = True,
migrate_deprecated: bool = False,
round_numbers: bool = True
) -> RefinementResult:
"""
Refine a schema file.
Args:
input_path: Path to input schema file
output_path: Path to output file (if None, overwrites input)
loosen_counts: Apply fixes for exact counts
migrate_deprecated: Migrate deprecated extensions
round_numbers: Round overly specific numbers
Returns:
RefinementResult
"""
with open(input_path) as f:
schema = json.load(f)
result = self.refine_schema(
schema,
loosen_counts=loosen_counts,
migrate_deprecated=migrate_deprecated,
round_numbers=round_numbers
)
if result.success and result.refined_schema:
output = output_path or input_path
with open(output, 'w') as f:
json.dump(result.refined_schema, f, indent=2)
return result
def format_refinement_report(self, result: RefinementResult) -> str:
"""
Format refinement results as a human-readable report.
Args:
result: Refinement results
Returns:
Formatted report string
"""
lines = []
# Header
lines.append("=" * 70)
lines.append("Schema Refinement Report")
lines.append("=" * 70)
lines.append("")
if not result.success:
lines.append(f"❌ Refinement failed: {result.error_message}")
return "\n".join(lines)
# Summary
action_count = len(result.actions_taken)
if action_count == 0:
lines.append("✅ No refinements needed - schema is already flexible")
else:
lines.append(f"✅ Applied {action_count} refinement(s)")
lines.append("")
# List actions
if result.actions_taken:
lines.append("Actions Taken:")
lines.append("-" * 70)
for i, action in enumerate(result.actions_taken, 1):
lines.append(f"{i}. {action.description}")
lines.append(f" Path: {action.path}")
if action.old_value is not None:
lines.append(f" Before: {json.dumps(action.old_value)}")
if action.new_value is not None:
lines.append(f" After: {json.dumps(action.new_value)}")
lines.append("")
return "\n".join(lines)
def refine_schema_cli(
schema_path: str,
output: Optional[str] = None,
loosen_counts: bool = True,
migrate_deprecated: bool = False,
round_numbers: bool = True,
dry_run: bool = False,
interactive: bool = False
) -> int:
"""
CLI entry point for schema refinement.
Args:
schema_path: Path to schema file
output: Output path (None = overwrite input)
loosen_counts: Apply count loosening fixes
migrate_deprecated: Migrate deprecated extensions
round_numbers: Round overly specific numbers
dry_run: Show changes without applying
interactive: Prompt for each fix
Returns:
Exit code (0 = success, 1 = no changes needed, 2 = error)
"""
refiner = SchemaRefiner()
try:
input_path = Path(schema_path)
output_path = Path(output) if output else None
# Load schema
with open(input_path) as f:
schema = json.load(f)
if interactive:
# Interactive mode - prompt for each fix
print(f"Refining schema: {schema_path}")
result = refiner.refine_schema_interactive(
schema,
loosen_counts=loosen_counts,
migrate_deprecated=migrate_deprecated,
round_numbers=round_numbers
)
if result.success and result.refined_schema and not dry_run:
# Write the refined schema
output = output_path or input_path
with open(output, 'w') as f:
json.dump(result.refined_schema, f, indent=2)
print(f"\nRefined schema written to: {output}")
elif dry_run:
# Just analyze and show what would be done
result = refiner.refine_schema(
schema,
loosen_counts=loosen_counts,
migrate_deprecated=migrate_deprecated,
round_numbers=round_numbers
)
print("DRY RUN - No changes will be made")
print()
else:
result = refiner.refine_schema_file(
input_path,
output_path,
loosen_counts=loosen_counts,
migrate_deprecated=migrate_deprecated,
round_numbers=round_numbers
)
# Only print full report if not in interactive mode (user already saw changes)
if not interactive:
report = refiner.format_refinement_report(result)
print(report)
elif result.success:
# Just print summary for interactive mode
print(f"\n{'='*70}")
print(f"Refinement complete: {len(result.actions_taken)} change(s) applied")
print(f"{'='*70}")
if result.success and len(result.actions_taken) > 0:
return 0 # Success with changes
elif result.success:
return 1 # Success but no changes needed
else:
return 2 # Error
except FileNotFoundError:
print(f"Error: Schema file not found: {schema_path}")
return 2
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in schema file: {e}")
return 2
except Exception as e:
print(f"Error: {e}")
return 2