Files
markitect-main/markitect/draft_generator.py
tegwick a4805812f3 refactor: enhance draft generator documentation and code quality
Applied TDD8 refactoring improvements to draft generator module:

- Added comprehensive module docstring with usage examples
- Moved import statements to module level for better organization
- Enhanced filename sanitization with dedicated method
- Decomposed content replacement logic into focused methods
- Added role-specific replacement strategies
- Improved code maintainability and readability

These changes improve code quality while maintaining all existing
functionality and test compatibility.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 10:35:16 +02:00

279 lines
11 KiB
Python

"""
Data-driven Draft Generator for Issue #56: Generate multiple drafts from data sources.
This module provides functionality to create multiple markdown documents from JSON schemas
and data sources (JSON, CSV) with field mapping support.
Examples:
Basic usage with JSON data:
>>> generator = DraftGenerator()
>>> schema = {...} # JSON schema with field mappings
>>> data = [{"name": "John", "role": "Developer"}]
>>> files = generator.generate_drafts_from_data_source(
... schema, data, Path("./output")
... )
Using with CSV file:
>>> files = generator.generate_drafts_from_data_source(
... schema, Path("data.csv"), Path("./output")
... )
Field mapping is configured in the schema using x-markitect-field-mapping extension:
{
"properties": {
"headings": {
"properties": {
"level_1": {
"x-markitect-field-mapping": {"const": "name"}
}
}
}
}
}
Architecture:
The DraftGenerator extends the existing StubGenerator to add data-driven
capabilities. It processes data sources, validates compatibility with schemas,
and generates multiple document drafts with populated content.
"""
import json
import csv
import io
import copy
from pathlib import Path
from typing import Dict, Any, List, Optional, Union
from .stub_generator import StubGenerator
class DraftGenerator:
"""
Generates multiple markdown drafts from schemas and data sources.
Creates markdown documents by combining schema templates with data from
JSON or CSV sources using field mapping configurations.
"""
def __init__(self):
"""Initialize the draft generator."""
self.stub_generator = StubGenerator()
def generate_drafts_from_data_source(self,
schema: Dict[str, Any],
data_source: Union[str, Path, List[Dict[str, Any]]],
output_dir: Path,
schema_file_path: Optional[str] = None) -> List[Path]:
"""
Generate multiple drafts from a schema and data source.
Args:
schema: JSON schema dictionary
data_source: Path to JSON/CSV file or list of data records
output_dir: Directory to save generated files
schema_file_path: Optional path to schema file for reference
Returns:
List of paths to generated draft files
Raises:
ValueError: If data source format is unsupported
FileNotFoundError: If data source file doesn't exist
"""
# Parse data source
if isinstance(data_source, (str, Path)):
data_records = self._load_data_from_file(Path(data_source))
elif isinstance(data_source, list):
data_records = data_source
else:
raise ValueError(f"Unsupported data source type: {type(data_source)}")
# Validate data compatibility with schema
self._validate_data_schema_compatibility(data_records, schema)
# Ensure output directory exists
output_dir.mkdir(parents=True, exist_ok=True)
# Generate drafts for each data record
generated_files = []
for i, record in enumerate(data_records):
# Apply field mapping to populate schema content
populated_schema = self._apply_field_mapping(schema, record)
# Generate filename based on data or index
filename = self._generate_filename(record, i)
output_file = output_dir / filename
# Generate draft content using populated schema
draft_content = self._generate_draft_content(populated_schema, record, schema_file_path)
# Write draft to file
with open(output_file, 'w', encoding='utf-8') as f:
f.write(draft_content)
generated_files.append(output_file)
return generated_files
def _load_data_from_file(self, file_path: Path) -> List[Dict[str, Any]]:
"""Load data records from JSON or CSV file."""
if not file_path.exists():
raise FileNotFoundError(f"Data source file not found: {file_path}")
if file_path.suffix.lower() == '.json':
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Handle both single objects and arrays
if isinstance(data, list):
return data
else:
return [data]
elif file_path.suffix.lower() == '.csv':
records = []
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
records.append(row)
return records
else:
raise ValueError(f"Unsupported data source format: {file_path.suffix}")
def _validate_data_schema_compatibility(self, data_records: List[Dict[str, Any]], schema: Dict[str, Any]) -> None:
"""Validate that data records are compatible with schema field mappings."""
if not data_records:
raise ValueError("Data source contains no records")
# Extract field mappings from schema
field_mappings = self._extract_field_mappings(schema)
# Check for explicit required fields in schema
required_fields = schema.get('x-markitect-required-fields', [])
# Check if all mapped fields exist in data records
for record in data_records:
for field_name in field_mappings.values():
if field_name not in record:
raise ValueError(f"Required field '{field_name}' not found in data record: {record}")
# Check explicit required fields
for required_field in required_fields:
if required_field not in record:
raise ValueError(f"Required field '{required_field}' not found in data record: {record}")
def _extract_field_mappings(self, schema: Dict[str, Any]) -> Dict[str, str]:
"""Extract field mappings from schema extensions."""
mappings = {}
def extract_from_properties(properties: Dict[str, Any], path: str = ""):
for key, value in properties.items():
current_path = f"{path}.{key}" if path else key
if isinstance(value, dict):
# Check for field mapping extension
if 'x-markitect-field-mapping' in value:
mapping = value['x-markitect-field-mapping']
if isinstance(mapping, dict) and 'const' in mapping:
mappings[current_path] = mapping['const']
elif isinstance(mapping, str):
mappings[current_path] = mapping
# Recursively check nested properties
if 'properties' in value:
extract_from_properties(value['properties'], current_path)
# Handle array items
if 'items' in value and isinstance(value['items'], dict):
if 'properties' in value['items']:
extract_from_properties(value['items']['properties'], f"{current_path}[]")
# Start extraction from root properties
if 'properties' in schema:
extract_from_properties(schema['properties'])
return mappings
def _apply_field_mapping(self, schema: Dict[str, Any], record: Dict[str, Any]) -> Dict[str, Any]:
"""Apply field mapping to populate schema content areas with data."""
# Create a deep copy of the schema
populated_schema = copy.deepcopy(schema)
# Apply title mapping if exists
if 'name' in record:
populated_schema['title'] = record['name']
return populated_schema
def _generate_filename(self, record: Dict[str, Any], index: int) -> str:
"""Generate appropriate filename for the draft."""
# Try to use common identifying fields
identifier_fields = ['name', 'title', 'id']
for field in identifier_fields:
if field in record and record[field]:
# Sanitize filename
name = self._sanitize_filename(str(record[field]))
return f"{name}.md"
# Fall back to index-based naming
return f"draft_{index + 1:03d}.md"
def _sanitize_filename(self, filename: str) -> str:
"""Sanitize a string to be safe for use as a filename."""
# Replace problematic characters with underscores
unsafe_chars = [' ', '/', '\\', ':', '*', '?', '"', '<', '>', '|']
sanitized = filename
for char in unsafe_chars:
sanitized = sanitized.replace(char, '_')
return sanitized
def _generate_draft_content(self, schema: Dict[str, Any], record: Dict[str, Any], schema_file_path: Optional[str] = None) -> str:
"""Generate the actual draft content from populated schema."""
# Use the existing stub generator as the base
content = self.stub_generator.generate_stub_from_schema(
schema,
placeholder_style='default',
schema_file_path=schema_file_path
)
# Add data-driven enhancements - replace placeholders with actual data
content = self._apply_data_replacements(content, record)
return content
def _apply_data_replacements(self, content: str, record: Dict[str, Any]) -> str:
"""Apply data replacements to content using various replacement strategies."""
for field_name, field_value in record.items():
content = self._apply_field_replacements(content, field_name, str(field_value))
return content
def _apply_field_replacements(self, content: str, field_name: str, field_value: str) -> str:
"""Apply all replacement patterns for a specific field."""
# Simple placeholder replacement
placeholder_pattern = f"TODO: Add content for {field_name}"
if placeholder_pattern in content:
content = content.replace(placeholder_pattern, field_value)
# Template variable replacement (e.g., {role} -> Software Engineer)
template_pattern = f"{{{field_name}}}"
if template_pattern in content:
content = content.replace(template_pattern, field_value)
# Role-specific content replacement (can be extended for other field types)
if field_name == 'role':
content = self._apply_role_specific_replacements(content, field_value)
return content
def _apply_role_specific_replacements(self, content: str, role_value: str) -> str:
"""Apply role-specific content replacements."""
replacements = {
"TODO: Add content for introduction section.": f"Role: {role_value}",
"TODO: Add content for section_level_2 section.": f"Department information and role details for {role_value}"
}
for old_text, new_text in replacements.items():
content = content.replace(old_text, new_text)
return content