Phase 0 - Project Organization: - Create docs/PROJECT_STRUCTURE.md documenting codebase layout - Create markitect/core/ with parser, serializer, document_manager, workspace - Create markitect/schema/ consolidating 6 schema_*.py modules - Create markitect/storage/ with database module - Maintain backward compatibility via re-exports from original locations - Add docs/roadmap/information-space-service/ with README and WORKPLAN Phase 1 - Foundation (Weeks 1-3): - Week 1: Core domain models (InformationSpace, SpaceDocument, SpaceConfig, SpaceMetadata, SpaceVariable, TransclusionReference, SpaceStatus) - Week 2: Repository layer with interfaces (ISpaceRepository, IDocumentAssociationRepository, IVariableRepository, IReferenceRepository) and SQLite implementations with foreign key cascade deletes - Week 3: SpaceService orchestration layer with full CRUD, document, variable, and reference tracking operations Test coverage: 124 tests (25 model + 63 repository + 36 integration) Capabilities delivered: - CAP-001: InformationSpace entity with lifecycle management - CAP-002: SpaceRepository CRUD with SQLite backing - CAP-003: Document-Space associations with path-based organization - CAP-004: Space metadata and configuration schemas - CAP-005: Database schema with migrations Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
531 lines
18 KiB
Python
531 lines
18 KiB
Python
"""
|
|
Schema Refiner for Phase 2: Schema Refinement Tools
|
|
|
|
Automatically refines rigid schemas by applying loosening rules and fixes.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
import json
|
|
import copy
|
|
from dataclasses import dataclass, field
|
|
|
|
from .analyzer import SchemaAnalyzer, SchemaIssue, IssueType, IssueSeverity
|
|
|
|
|
|
@dataclass
|
|
class RefinementAction:
|
|
"""Represents a refinement action taken on the schema."""
|
|
issue_type: IssueType
|
|
path: str
|
|
description: str
|
|
old_value: Any = None
|
|
new_value: Any = None
|
|
|
|
|
|
@dataclass
|
|
class RefinementResult:
|
|
"""Results of schema refinement."""
|
|
success: bool
|
|
actions_taken: List[RefinementAction] = field(default_factory=list)
|
|
refined_schema: Optional[Dict[str, Any]] = None
|
|
error_message: Optional[str] = None
|
|
|
|
|
|
class SchemaRefiner:
|
|
"""Refines rigid schemas by applying loosening rules."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the schema refiner."""
|
|
self.analyzer = SchemaAnalyzer()
|
|
|
|
def _navigate_to_path(self, schema: Dict[str, Any], path: str) -> Optional[Tuple[Dict[str, Any], str]]:
|
|
"""
|
|
Navigate to a path in the schema, handling nested 'properties' objects.
|
|
|
|
Returns (parent_object, property_name) or None if path doesn't exist.
|
|
"""
|
|
path_parts = path.split('.')
|
|
obj = schema
|
|
|
|
# Navigate through all but the last part
|
|
for i, part in enumerate(path_parts[:-1]):
|
|
# Try direct access first
|
|
if part in obj:
|
|
obj = obj[part]
|
|
# If not found and obj has 'properties', try there
|
|
elif isinstance(obj, dict) and "properties" in obj and part in obj["properties"]:
|
|
obj = obj["properties"][part]
|
|
else:
|
|
return None
|
|
|
|
# For the final part, check if we need to descend into 'properties'
|
|
prop_name = path_parts[-1]
|
|
if prop_name in obj:
|
|
return (obj, prop_name)
|
|
elif isinstance(obj, dict) and "properties" in obj and prop_name in obj["properties"]:
|
|
return (obj["properties"], prop_name)
|
|
else:
|
|
return None
|
|
|
|
def refine_schema_interactive(
|
|
self,
|
|
schema: Dict[str, Any],
|
|
loosen_counts: bool = True,
|
|
migrate_deprecated: bool = False,
|
|
round_numbers: bool = True
|
|
) -> RefinementResult:
|
|
"""
|
|
Refine a schema interactively, prompting for each fix.
|
|
|
|
Args:
|
|
schema: The JSON schema to refine
|
|
loosen_counts: Enable fixes for exact counts
|
|
migrate_deprecated: Enable migration of deprecated extensions
|
|
round_numbers: Enable rounding of overly specific numbers
|
|
|
|
Returns:
|
|
RefinementResult with actions taken and refined schema
|
|
"""
|
|
result = RefinementResult(success=False)
|
|
|
|
try:
|
|
# Analyze the schema first
|
|
analysis = self.analyzer.analyze_schema(schema)
|
|
|
|
print(f"\nFound {len(analysis.issues)} issue(s) to review\n")
|
|
|
|
# Deep copy to avoid modifying original
|
|
refined = copy.deepcopy(schema)
|
|
|
|
# Process each issue interactively
|
|
for i, issue in enumerate(analysis.issues, 1):
|
|
print(f"Issue {i}/{len(analysis.issues)}")
|
|
print(f" Type: {issue.issue_type.value}")
|
|
print(f" Path: {issue.path}")
|
|
print(f" {issue.message}")
|
|
print(f" Suggestion: {issue.suggestion}")
|
|
|
|
if issue.current_value is not None:
|
|
print(f" Current: {json.dumps(issue.current_value)}")
|
|
if issue.suggested_value is not None:
|
|
print(f" Suggested: {json.dumps(issue.suggested_value)}")
|
|
|
|
# Ask user if they want to apply the fix
|
|
response = input("\nApply this fix? [y/N/q]: ").strip().lower()
|
|
|
|
if response == 'q':
|
|
print("Refinement cancelled by user")
|
|
result.success = False
|
|
return result
|
|
elif response == 'y':
|
|
action = None
|
|
|
|
if loosen_counts and issue.issue_type == IssueType.EXACT_COUNT:
|
|
action = self._fix_exact_count(refined, issue)
|
|
|
|
elif round_numbers and issue.issue_type == IssueType.OVERLY_SPECIFIC:
|
|
action = self._fix_overly_specific(refined, issue)
|
|
|
|
elif loosen_counts and issue.issue_type == IssueType.NO_FLEXIBILITY:
|
|
action = self._fix_no_flexibility(refined, issue)
|
|
|
|
elif migrate_deprecated and issue.issue_type == IssueType.DEPRECATED_EXTENSIONS:
|
|
action = self._fix_deprecated_extension(refined, issue)
|
|
|
|
if action:
|
|
result.actions_taken.append(action)
|
|
print(f" ✓ Applied")
|
|
else:
|
|
print(f" ✗ Could not apply fix")
|
|
else:
|
|
print(f" - Skipped")
|
|
|
|
print()
|
|
|
|
result.refined_schema = refined
|
|
result.success = True
|
|
|
|
except Exception as e:
|
|
result.error_message = str(e)
|
|
|
|
return result
|
|
|
|
def refine_schema(
|
|
self,
|
|
schema: Dict[str, Any],
|
|
loosen_counts: bool = True,
|
|
migrate_deprecated: bool = False,
|
|
round_numbers: bool = True
|
|
) -> RefinementResult:
|
|
"""
|
|
Refine a schema by applying fixes for detected issues.
|
|
|
|
Args:
|
|
schema: The JSON schema to refine
|
|
loosen_counts: Apply fixes for exact counts
|
|
migrate_deprecated: Migrate deprecated extensions
|
|
round_numbers: Round overly specific numbers
|
|
|
|
Returns:
|
|
RefinementResult with actions taken and refined schema
|
|
"""
|
|
result = RefinementResult(success=False)
|
|
|
|
try:
|
|
# Analyze the schema first
|
|
analysis = self.analyzer.analyze_schema(schema)
|
|
|
|
# Deep copy to avoid modifying original
|
|
refined = copy.deepcopy(schema)
|
|
|
|
# Apply fixes based on issues found
|
|
for issue in analysis.issues:
|
|
action = None
|
|
|
|
if loosen_counts and issue.issue_type == IssueType.EXACT_COUNT:
|
|
action = self._fix_exact_count(refined, issue)
|
|
|
|
elif round_numbers and issue.issue_type == IssueType.OVERLY_SPECIFIC:
|
|
action = self._fix_overly_specific(refined, issue)
|
|
|
|
elif loosen_counts and issue.issue_type == IssueType.NO_FLEXIBILITY:
|
|
action = self._fix_no_flexibility(refined, issue)
|
|
|
|
elif migrate_deprecated and issue.issue_type == IssueType.DEPRECATED_EXTENSIONS:
|
|
action = self._fix_deprecated_extension(refined, issue)
|
|
|
|
if action:
|
|
result.actions_taken.append(action)
|
|
|
|
result.refined_schema = refined
|
|
result.success = True
|
|
|
|
except Exception as e:
|
|
result.error_message = str(e)
|
|
|
|
return result
|
|
|
|
def _fix_exact_count(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
|
|
"""Fix exact count constraints by converting to ranges."""
|
|
nav_result = self._navigate_to_path(schema, issue.path)
|
|
if not nav_result:
|
|
return None
|
|
|
|
obj, prop_name = nav_result
|
|
prop_def = obj[prop_name]
|
|
old_value = copy.deepcopy(prop_def)
|
|
|
|
# Check if it's an array with exact minItems/maxItems
|
|
if isinstance(prop_def, dict) and prop_def.get("type") == "array":
|
|
min_items = prop_def.get("minItems")
|
|
max_items = prop_def.get("maxItems")
|
|
|
|
if min_items is not None and max_items is not None and min_items == max_items:
|
|
# Apply suggested loosening
|
|
new_min = max(0, min_items - 2)
|
|
new_max = min_items + 5
|
|
|
|
prop_def["minItems"] = new_min
|
|
prop_def["maxItems"] = new_max
|
|
|
|
return RefinementAction(
|
|
issue_type=IssueType.EXACT_COUNT,
|
|
path=issue.path,
|
|
description=f"Loosened array count from exactly {min_items} to range {new_min}-{new_max}",
|
|
old_value={"minItems": min_items, "maxItems": max_items},
|
|
new_value={"minItems": new_min, "maxItems": new_max}
|
|
)
|
|
|
|
# Check if it's a const value
|
|
if isinstance(prop_def, dict) and "const" in prop_def:
|
|
const_value = prop_def["const"]
|
|
del prop_def["const"]
|
|
|
|
# If it's a number, convert to a range
|
|
if isinstance(const_value, int):
|
|
prop_def["minimum"] = const_value - 1
|
|
prop_def["maximum"] = const_value + 1
|
|
|
|
return RefinementAction(
|
|
issue_type=IssueType.EXACT_COUNT,
|
|
path=issue.path,
|
|
description=f"Converted const {const_value} to range {const_value-1}-{const_value+1}",
|
|
old_value=const_value,
|
|
new_value={"minimum": const_value - 1, "maximum": const_value + 1}
|
|
)
|
|
else:
|
|
# For non-numeric constants, just remove the constraint
|
|
return RefinementAction(
|
|
issue_type=IssueType.EXACT_COUNT,
|
|
path=issue.path,
|
|
description=f"Removed const constraint: {const_value}",
|
|
old_value=const_value,
|
|
new_value=None
|
|
)
|
|
|
|
return None
|
|
|
|
def _fix_overly_specific(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
|
|
"""Fix overly specific number constraints by rounding."""
|
|
if issue.suggested_value is None:
|
|
return None
|
|
|
|
nav_result = self._navigate_to_path(schema, issue.path)
|
|
if not nav_result:
|
|
return None
|
|
|
|
obj, prop_name = nav_result
|
|
prop_def = obj[prop_name]
|
|
|
|
# Round the minItems value
|
|
if isinstance(prop_def, dict) and "minItems" in prop_def:
|
|
old_value = prop_def["minItems"]
|
|
new_value = issue.suggested_value
|
|
prop_def["minItems"] = new_value
|
|
|
|
return RefinementAction(
|
|
issue_type=IssueType.OVERLY_SPECIFIC,
|
|
path=issue.path,
|
|
description=f"Rounded minItems from {old_value} to {new_value}",
|
|
old_value=old_value,
|
|
new_value=new_value
|
|
)
|
|
|
|
return None
|
|
|
|
def _fix_no_flexibility(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
|
|
"""Fix narrow ranges by widening them."""
|
|
nav_result = self._navigate_to_path(schema, issue.path)
|
|
if not nav_result:
|
|
return None
|
|
|
|
obj, prop_name = nav_result
|
|
prop_def = obj[prop_name]
|
|
|
|
if isinstance(prop_def, dict) and "minimum" in prop_def and "maximum" in prop_def:
|
|
old_min = prop_def["minimum"]
|
|
old_max = prop_def["maximum"]
|
|
range_size = old_max - old_min
|
|
|
|
# Widen the range
|
|
new_min = old_min - 5
|
|
new_max = old_max + 5
|
|
|
|
prop_def["minimum"] = new_min
|
|
prop_def["maximum"] = new_max
|
|
|
|
return RefinementAction(
|
|
issue_type=IssueType.NO_FLEXIBILITY,
|
|
path=issue.path,
|
|
description=f"Widened range from {old_min}-{old_max} to {new_min}-{new_max}",
|
|
old_value={"minimum": old_min, "maximum": old_max},
|
|
new_value={"minimum": new_min, "maximum": new_max}
|
|
)
|
|
|
|
return None
|
|
|
|
def _fix_deprecated_extension(self, schema: Dict[str, Any], issue: SchemaIssue) -> Optional[RefinementAction]:
|
|
"""Remove deprecated extension (migration requires manual work)."""
|
|
# For now, just document that manual migration is needed
|
|
# Full migration would require understanding the old format
|
|
|
|
deprecated_key = issue.path
|
|
if deprecated_key in schema:
|
|
old_value = schema[deprecated_key]
|
|
# Don't actually remove it automatically - too risky
|
|
return RefinementAction(
|
|
issue_type=IssueType.DEPRECATED_EXTENSIONS,
|
|
path=issue.path,
|
|
description=f"Detected deprecated extension (manual migration recommended)",
|
|
old_value=old_value,
|
|
new_value=None
|
|
)
|
|
|
|
return None
|
|
|
|
def refine_schema_file(
|
|
self,
|
|
input_path: Path,
|
|
output_path: Optional[Path] = None,
|
|
loosen_counts: bool = True,
|
|
migrate_deprecated: bool = False,
|
|
round_numbers: bool = True
|
|
) -> RefinementResult:
|
|
"""
|
|
Refine a schema file.
|
|
|
|
Args:
|
|
input_path: Path to input schema file
|
|
output_path: Path to output file (if None, overwrites input)
|
|
loosen_counts: Apply fixes for exact counts
|
|
migrate_deprecated: Migrate deprecated extensions
|
|
round_numbers: Round overly specific numbers
|
|
|
|
Returns:
|
|
RefinementResult
|
|
"""
|
|
with open(input_path) as f:
|
|
schema = json.load(f)
|
|
|
|
result = self.refine_schema(
|
|
schema,
|
|
loosen_counts=loosen_counts,
|
|
migrate_deprecated=migrate_deprecated,
|
|
round_numbers=round_numbers
|
|
)
|
|
|
|
if result.success and result.refined_schema:
|
|
output = output_path or input_path
|
|
with open(output, 'w') as f:
|
|
json.dump(result.refined_schema, f, indent=2)
|
|
|
|
return result
|
|
|
|
def format_refinement_report(self, result: RefinementResult) -> str:
|
|
"""
|
|
Format refinement results as a human-readable report.
|
|
|
|
Args:
|
|
result: Refinement results
|
|
|
|
Returns:
|
|
Formatted report string
|
|
"""
|
|
lines = []
|
|
|
|
# Header
|
|
lines.append("=" * 70)
|
|
lines.append("Schema Refinement Report")
|
|
lines.append("=" * 70)
|
|
lines.append("")
|
|
|
|
if not result.success:
|
|
lines.append(f"❌ Refinement failed: {result.error_message}")
|
|
return "\n".join(lines)
|
|
|
|
# Summary
|
|
action_count = len(result.actions_taken)
|
|
if action_count == 0:
|
|
lines.append("✅ No refinements needed - schema is already flexible")
|
|
else:
|
|
lines.append(f"✅ Applied {action_count} refinement(s)")
|
|
lines.append("")
|
|
|
|
# List actions
|
|
if result.actions_taken:
|
|
lines.append("Actions Taken:")
|
|
lines.append("-" * 70)
|
|
|
|
for i, action in enumerate(result.actions_taken, 1):
|
|
lines.append(f"{i}. {action.description}")
|
|
lines.append(f" Path: {action.path}")
|
|
|
|
if action.old_value is not None:
|
|
lines.append(f" Before: {json.dumps(action.old_value)}")
|
|
if action.new_value is not None:
|
|
lines.append(f" After: {json.dumps(action.new_value)}")
|
|
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def refine_schema_cli(
|
|
schema_path: str,
|
|
output: Optional[str] = None,
|
|
loosen_counts: bool = True,
|
|
migrate_deprecated: bool = False,
|
|
round_numbers: bool = True,
|
|
dry_run: bool = False,
|
|
interactive: bool = False
|
|
) -> int:
|
|
"""
|
|
CLI entry point for schema refinement.
|
|
|
|
Args:
|
|
schema_path: Path to schema file
|
|
output: Output path (None = overwrite input)
|
|
loosen_counts: Apply count loosening fixes
|
|
migrate_deprecated: Migrate deprecated extensions
|
|
round_numbers: Round overly specific numbers
|
|
dry_run: Show changes without applying
|
|
interactive: Prompt for each fix
|
|
|
|
Returns:
|
|
Exit code (0 = success, 1 = no changes needed, 2 = error)
|
|
"""
|
|
refiner = SchemaRefiner()
|
|
|
|
try:
|
|
input_path = Path(schema_path)
|
|
output_path = Path(output) if output else None
|
|
|
|
# Load schema
|
|
with open(input_path) as f:
|
|
schema = json.load(f)
|
|
|
|
if interactive:
|
|
# Interactive mode - prompt for each fix
|
|
print(f"Refining schema: {schema_path}")
|
|
result = refiner.refine_schema_interactive(
|
|
schema,
|
|
loosen_counts=loosen_counts,
|
|
migrate_deprecated=migrate_deprecated,
|
|
round_numbers=round_numbers
|
|
)
|
|
|
|
if result.success and result.refined_schema and not dry_run:
|
|
# Write the refined schema
|
|
output = output_path or input_path
|
|
with open(output, 'w') as f:
|
|
json.dump(result.refined_schema, f, indent=2)
|
|
print(f"\nRefined schema written to: {output}")
|
|
|
|
elif dry_run:
|
|
# Just analyze and show what would be done
|
|
result = refiner.refine_schema(
|
|
schema,
|
|
loosen_counts=loosen_counts,
|
|
migrate_deprecated=migrate_deprecated,
|
|
round_numbers=round_numbers
|
|
)
|
|
|
|
print("DRY RUN - No changes will be made")
|
|
print()
|
|
else:
|
|
result = refiner.refine_schema_file(
|
|
input_path,
|
|
output_path,
|
|
loosen_counts=loosen_counts,
|
|
migrate_deprecated=migrate_deprecated,
|
|
round_numbers=round_numbers
|
|
)
|
|
|
|
# Only print full report if not in interactive mode (user already saw changes)
|
|
if not interactive:
|
|
report = refiner.format_refinement_report(result)
|
|
print(report)
|
|
elif result.success:
|
|
# Just print summary for interactive mode
|
|
print(f"\n{'='*70}")
|
|
print(f"Refinement complete: {len(result.actions_taken)} change(s) applied")
|
|
print(f"{'='*70}")
|
|
|
|
if result.success and len(result.actions_taken) > 0:
|
|
return 0 # Success with changes
|
|
elif result.success:
|
|
return 1 # Success but no changes needed
|
|
else:
|
|
return 2 # Error
|
|
|
|
except FileNotFoundError:
|
|
print(f"Error: Schema file not found: {schema_path}")
|
|
return 2
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error: Invalid JSON in schema file: {e}")
|
|
return 2
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return 2
|