""" Data-driven Draft Generator for Issue #56: Generate multiple drafts from data sources. This module provides functionality to create multiple markdown documents from JSON schemas and data sources (JSON, CSV) with field mapping support. Examples: Basic usage with JSON data: >>> generator = DraftGenerator() >>> schema = {...} # JSON schema with field mappings >>> data = [{"name": "John", "role": "Developer"}] >>> files = generator.generate_drafts_from_data_source( ... schema, data, Path("./output") ... ) Using with CSV file: >>> files = generator.generate_drafts_from_data_source( ... schema, Path("data.csv"), Path("./output") ... ) Field mapping is configured in the schema using x-markitect-field-mapping extension: { "properties": { "headings": { "properties": { "level_1": { "x-markitect-field-mapping": {"const": "name"} } } } } } Architecture: The DraftGenerator extends the existing StubGenerator to add data-driven capabilities. It processes data sources, validates compatibility with schemas, and generates multiple document drafts with populated content. """ import json import csv import io import copy from pathlib import Path from typing import Dict, Any, List, Optional, Union from .stub_generator import StubGenerator class DraftGenerator: """ Generates multiple markdown drafts from schemas and data sources. Creates markdown documents by combining schema templates with data from JSON or CSV sources using field mapping configurations. """ def __init__(self): """Initialize the draft generator.""" self.stub_generator = StubGenerator() def generate_drafts_from_data_source(self, schema: Dict[str, Any], data_source: Union[str, Path, List[Dict[str, Any]]], output_dir: Path, schema_file_path: Optional[str] = None) -> List[Path]: """ Generate multiple drafts from a schema and data source. Args: schema: JSON schema dictionary data_source: Path to JSON/CSV file or list of data records output_dir: Directory to save generated files schema_file_path: Optional path to schema file for reference Returns: List of paths to generated draft files Raises: ValueError: If data source format is unsupported FileNotFoundError: If data source file doesn't exist """ # Parse data source if isinstance(data_source, (str, Path)): data_records = self._load_data_from_file(Path(data_source)) elif isinstance(data_source, list): data_records = data_source else: raise ValueError(f"Unsupported data source type: {type(data_source)}") # Validate data compatibility with schema self._validate_data_schema_compatibility(data_records, schema) # Ensure output directory exists output_dir.mkdir(parents=True, exist_ok=True) # Generate drafts for each data record generated_files = [] for i, record in enumerate(data_records): # Apply field mapping to populate schema content populated_schema = self._apply_field_mapping(schema, record) # Generate filename based on data or index filename = self._generate_filename(record, i) output_file = output_dir / filename # Generate draft content using populated schema draft_content = self._generate_draft_content(populated_schema, record, schema_file_path) # Write draft to file with open(output_file, 'w', encoding='utf-8') as f: f.write(draft_content) generated_files.append(output_file) return generated_files def _load_data_from_file(self, file_path: Path) -> List[Dict[str, Any]]: """Load data records from JSON or CSV file.""" if not file_path.exists(): raise FileNotFoundError(f"Data source file not found: {file_path}") if file_path.suffix.lower() == '.json': with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # Handle both single objects and arrays if isinstance(data, list): return data else: return [data] elif file_path.suffix.lower() == '.csv': records = [] with open(file_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: records.append(row) return records else: raise ValueError(f"Unsupported data source format: {file_path.suffix}") def _validate_data_schema_compatibility(self, data_records: List[Dict[str, Any]], schema: Dict[str, Any]) -> None: """Validate that data records are compatible with schema field mappings.""" if not data_records: raise ValueError("Data source contains no records") # Extract field mappings from schema field_mappings = self._extract_field_mappings(schema) # Check for explicit required fields in schema required_fields = schema.get('x-markitect-required-fields', []) # Check if all mapped fields exist in data records for record in data_records: for field_name in field_mappings.values(): if field_name not in record: raise ValueError(f"Required field '{field_name}' not found in data record: {record}") # Check explicit required fields for required_field in required_fields: if required_field not in record: raise ValueError(f"Required field '{required_field}' not found in data record: {record}") def _extract_field_mappings(self, schema: Dict[str, Any]) -> Dict[str, str]: """Extract field mappings from schema extensions.""" mappings = {} def extract_from_properties(properties: Dict[str, Any], path: str = ""): for key, value in properties.items(): current_path = f"{path}.{key}" if path else key if isinstance(value, dict): # Check for field mapping extension if 'x-markitect-field-mapping' in value: mapping = value['x-markitect-field-mapping'] if isinstance(mapping, dict) and 'const' in mapping: mappings[current_path] = mapping['const'] elif isinstance(mapping, str): mappings[current_path] = mapping # Recursively check nested properties if 'properties' in value: extract_from_properties(value['properties'], current_path) # Handle array items if 'items' in value and isinstance(value['items'], dict): if 'properties' in value['items']: extract_from_properties(value['items']['properties'], f"{current_path}[]") # Start extraction from root properties if 'properties' in schema: extract_from_properties(schema['properties']) return mappings def _apply_field_mapping(self, schema: Dict[str, Any], record: Dict[str, Any]) -> Dict[str, Any]: """Apply field mapping to populate schema content areas with data.""" # Create a deep copy of the schema populated_schema = copy.deepcopy(schema) # Apply title mapping if exists if 'name' in record: populated_schema['title'] = record['name'] return populated_schema def _generate_filename(self, record: Dict[str, Any], index: int) -> str: """Generate appropriate filename for the draft.""" # Try to use common identifying fields identifier_fields = ['name', 'title', 'id'] for field in identifier_fields: if field in record and record[field]: # Sanitize filename name = self._sanitize_filename(str(record[field])) return f"{name}.md" # Fall back to index-based naming return f"draft_{index + 1:03d}.md" def _sanitize_filename(self, filename: str) -> str: """Sanitize a string to be safe for use as a filename.""" # Replace problematic characters with underscores unsafe_chars = [' ', '/', '\\', ':', '*', '?', '"', '<', '>', '|'] sanitized = filename for char in unsafe_chars: sanitized = sanitized.replace(char, '_') return sanitized def _generate_draft_content(self, schema: Dict[str, Any], record: Dict[str, Any], schema_file_path: Optional[str] = None) -> str: """Generate the actual draft content from populated schema.""" # Use the existing stub generator as the base content = self.stub_generator.generate_stub_from_schema( schema, placeholder_style='default', schema_file_path=schema_file_path ) # Add data-driven enhancements - replace placeholders with actual data content = self._apply_data_replacements(content, record) return content def _apply_data_replacements(self, content: str, record: Dict[str, Any]) -> str: """Apply data replacements to content using various replacement strategies.""" for field_name, field_value in record.items(): content = self._apply_field_replacements(content, field_name, str(field_value)) return content def _apply_field_replacements(self, content: str, field_name: str, field_value: str) -> str: """Apply all replacement patterns for a specific field.""" # Simple placeholder replacement placeholder_pattern = f"TODO: Add content for {field_name}" if placeholder_pattern in content: content = content.replace(placeholder_pattern, field_value) # Template variable replacement (e.g., {role} -> Software Engineer) template_pattern = f"{{{field_name}}}" if template_pattern in content: content = content.replace(template_pattern, field_value) # Role-specific content replacement (can be extended for other field types) if field_name == 'role': content = self._apply_role_specific_replacements(content, field_value) return content def _apply_role_specific_replacements(self, content: str, role_value: str) -> str: """Apply role-specific content replacements.""" replacements = { "TODO: Add content for introduction section.": f"Role: {role_value}", "TODO: Add content for section_level_2 section.": f"Department information and role details for {role_value}" } for old_text, new_text in replacements.items(): content = content.replace(old_text, new_text) return content