feat: implement GraphQL write interface with mutations (issue #10)
Some checks failed
Test Suite / performance-tests (push) Has been cancelled
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled

Added comprehensive GraphQL mutations for CRUD operations on markdown files and schemas.

Key features:
- Complete mutation schema with structured payload types
- Markdown file mutations: add, update with front matter support
- Schema mutations: add, update, delete with JSON validation
- CLI integration with `graphql-mutate` command
- Comprehensive error handling and validation
- Full test coverage with 24 test cases
- Updated documentation with mutation examples and API usage

Resolves issue #10: Expose a GraphQL Write Interface

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-03 16:48:03 +02:00
parent d4e5992213
commit 2a15dde228
6 changed files with 2084 additions and 8 deletions

View File

@@ -192,5 +192,421 @@ class Query(ObjectType):
)
# Create the schema
schema = graphene.Schema(query=Query)
# Mutation Types for Write Operations
class AddMarkdownFilePayload(ObjectType):
"""Payload for addMarkdownFile mutation."""
markdown_file = Field(MarkdownFile, description="The created markdown file")
success = graphene.Boolean(description="Whether the operation was successful")
errors = List(String, description="List of validation errors")
class UpdateMarkdownFilePayload(ObjectType):
"""Payload for updateMarkdownFile mutation."""
markdown_file = Field(MarkdownFile, description="The updated markdown file")
success = graphene.Boolean(description="Whether the operation was successful")
errors = List(String, description="List of validation errors")
class AddSchemaPayload(ObjectType):
"""Payload for addSchema mutation."""
schema = Field(Schema, description="The created schema")
success = graphene.Boolean(description="Whether the operation was successful")
errors = List(String, description="List of validation errors")
class UpdateSchemaPayload(ObjectType):
"""Payload for updateSchema mutation."""
schema = Field(Schema, description="The updated schema")
success = graphene.Boolean(description="Whether the operation was successful")
errors = List(String, description="List of validation errors")
class DeleteSchemaPayload(ObjectType):
"""Payload for deleteSchema mutation."""
success = graphene.Boolean(description="Whether the operation was successful")
deleted_filename = String(description="The filename of the deleted schema")
errors = List(String, description="List of validation errors")
class Mutation(ObjectType):
"""Root GraphQL mutation type for write operations."""
# Markdown file mutations
add_markdown_file = Field(
AddMarkdownFilePayload,
filename=String(required=True, description="Filename for the markdown file"),
content=String(required=True, description="Markdown content including front matter"),
description="Add a new markdown file to the database"
)
update_markdown_file = Field(
UpdateMarkdownFilePayload,
id=Int(required=True, description="ID of the markdown file to update"),
content=String(description="New markdown content"),
description="Update an existing markdown file"
)
# Schema mutations
add_schema = Field(
AddSchemaPayload,
filename=String(required=True, description="Filename for the schema"),
schema_content=JSONString(required=True, description="JSON schema content"),
description="Add a new JSON schema to the database"
)
update_schema = Field(
UpdateSchemaPayload,
id=Int(required=True, description="ID of the schema to update"),
schema_content=JSONString(description="New JSON schema content"),
description="Update an existing JSON schema"
)
delete_schema = Field(
DeleteSchemaPayload,
filename=String(required=True, description="Filename of the schema to delete"),
description="Delete a JSON schema from the database"
)
def resolve_add_markdown_file(self, info, filename, content):
"""Add a new markdown file to the database."""
import json
from ..database import DatabaseManager
from .resolvers import get_default_database_path
try:
# Get database manager
db_manager = DatabaseManager(get_default_database_path())
# Store the file using the database manager
file_id = db_manager.store_markdown_file(filename, content)
if file_id:
# Retrieve the created file
import sqlite3
conn = sqlite3.connect(get_default_database_path())
cursor = conn.cursor()
cursor.execute("SELECT * FROM markdown_files WHERE id = ?", (file_id,))
row = cursor.fetchone()
if row:
data = dict(zip([col[0] for col in cursor.description], row))
# Parse front matter JSON
if data['front_matter']:
try:
data['front_matter_raw'] = json.loads(data['front_matter'])
except json.JSONDecodeError:
data['front_matter_raw'] = {}
else:
data['front_matter_raw'] = {}
# Parse datetime strings to datetime objects
from datetime import datetime
if data.get('created_at'):
try:
data['created_at'] = datetime.fromisoformat(data['created_at'])
except (ValueError, TypeError):
data['created_at'] = None
conn.close()
return AddMarkdownFilePayload(
markdown_file=MarkdownFile(**data),
success=True,
errors=[]
)
conn.close()
return AddMarkdownFilePayload(
markdown_file=None,
success=False,
errors=["Failed to store markdown file"]
)
except Exception as e:
return AddMarkdownFilePayload(
markdown_file=None,
success=False,
errors=[str(e)]
)
def resolve_update_markdown_file(self, info, id, content=None):
"""Update an existing markdown file."""
import json
import sqlite3
from ..database import DatabaseManager
from .resolvers import get_default_database_path
try:
if not content:
return UpdateMarkdownFilePayload(
markdown_file=None,
success=False,
errors=["Content is required for update"]
)
db_path = get_default_database_path()
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check if file exists and get filename
cursor.execute("SELECT filename FROM markdown_files WHERE id = ?", (id,))
row = cursor.fetchone()
if not row:
conn.close()
return UpdateMarkdownFilePayload(
markdown_file=None,
success=False,
errors=[f"Markdown file with ID {id} not found"]
)
filename = row[0]
# Parse front matter and content for update
from ..frontmatter import FrontMatterParser
front_matter_parser = FrontMatterParser()
front_matter, markdown_content = front_matter_parser.parse(content)
front_matter_json = json.dumps(front_matter) if front_matter else '{}'
# Update the file directly with SQL
cursor.execute('''
UPDATE markdown_files
SET content = ?, front_matter = ?
WHERE id = ?
''', (markdown_content, front_matter_json, id))
conn.commit()
# Retrieve the updated file
cursor.execute("SELECT * FROM markdown_files WHERE id = ?", (id,))
row = cursor.fetchone()
if row:
data = dict(zip([col[0] for col in cursor.description], row))
# Parse front matter JSON
if data['front_matter']:
try:
data['front_matter_raw'] = json.loads(data['front_matter'])
except json.JSONDecodeError:
data['front_matter_raw'] = {}
else:
data['front_matter_raw'] = {}
# Parse datetime strings to datetime objects
from datetime import datetime
if data.get('created_at'):
try:
data['created_at'] = datetime.fromisoformat(data['created_at'])
except (ValueError, TypeError):
data['created_at'] = None
conn.close()
return UpdateMarkdownFilePayload(
markdown_file=MarkdownFile(**data),
success=True,
errors=[]
)
conn.close()
return UpdateMarkdownFilePayload(
markdown_file=None,
success=False,
errors=["Failed to update markdown file"]
)
except Exception as e:
return UpdateMarkdownFilePayload(
markdown_file=None,
success=False,
errors=[str(e)]
)
def resolve_add_schema(self, info, filename, schema_content):
"""Add a new JSON schema to the database."""
import json
import sqlite3
from ..database import DatabaseManager
from .resolvers import get_default_database_path
try:
# Get database manager
db_manager = DatabaseManager(get_default_database_path())
# Convert schema_content to JSON string if it's a dict
if isinstance(schema_content, dict):
schema_content_str = json.dumps(schema_content)
else:
schema_content_str = schema_content
# Store the schema using the database manager
schema_id = db_manager.store_schema_file(filename, schema_content_str)
if schema_id:
# Retrieve the created schema
conn = sqlite3.connect(get_default_database_path())
cursor = conn.cursor()
cursor.execute("SELECT * FROM schemas WHERE id = ?", (schema_id,))
row = cursor.fetchone()
if row:
data = dict(zip([col[0] for col in cursor.description], row))
# Parse schema content JSON
if data['schema_content']:
try:
data['schema_content'] = json.loads(data['schema_content'])
except json.JSONDecodeError:
data['schema_content'] = {}
# Parse datetime strings to datetime objects
from datetime import datetime
for dt_field in ['created_at', 'updated_at']:
if data.get(dt_field):
try:
data[dt_field] = datetime.fromisoformat(data[dt_field])
except (ValueError, TypeError):
data[dt_field] = None
conn.close()
return AddSchemaPayload(
schema=Schema(**data),
success=True,
errors=[]
)
conn.close()
return AddSchemaPayload(
schema=None,
success=False,
errors=["Failed to store schema"]
)
except Exception as e:
return AddSchemaPayload(
schema=None,
success=False,
errors=[str(e)]
)
def resolve_update_schema(self, info, id, schema_content=None):
"""Update an existing JSON schema."""
import json
import sqlite3
from ..database import DatabaseManager
from .resolvers import get_default_database_path
try:
if not schema_content:
return UpdateSchemaPayload(
schema=None,
success=False,
errors=["Schema content is required for update"]
)
db_path = get_default_database_path()
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check if schema exists
cursor.execute("SELECT filename FROM schemas WHERE id = ?", (id,))
row = cursor.fetchone()
if not row:
conn.close()
return UpdateSchemaPayload(
schema=None,
success=False,
errors=[f"Schema with ID {id} not found"]
)
filename = row[0]
conn.close()
# Convert schema_content to JSON string if it's a dict
if isinstance(schema_content, dict):
schema_content_str = json.dumps(schema_content)
else:
schema_content_str = schema_content
# Update using store_schema_file
db_manager = DatabaseManager(db_path)
schema_id = db_manager.store_schema_file(filename, schema_content_str)
if schema_id:
# Retrieve the updated schema
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM schemas WHERE filename = ?", (filename,))
row = cursor.fetchone()
if row:
data = dict(zip([col[0] for col in cursor.description], row))
# Parse schema content JSON
if data['schema_content']:
try:
data['schema_content'] = json.loads(data['schema_content'])
except json.JSONDecodeError:
data['schema_content'] = {}
# Parse datetime strings to datetime objects
from datetime import datetime
for dt_field in ['created_at', 'updated_at']:
if data.get(dt_field):
try:
data[dt_field] = datetime.fromisoformat(data[dt_field])
except (ValueError, TypeError):
data[dt_field] = None
conn.close()
return UpdateSchemaPayload(
schema=Schema(**data),
success=True,
errors=[]
)
conn.close()
return UpdateSchemaPayload(
schema=None,
success=False,
errors=["Failed to update schema"]
)
except Exception as e:
return UpdateSchemaPayload(
schema=None,
success=False,
errors=[str(e)]
)
def resolve_delete_schema(self, info, filename):
"""Delete a JSON schema from the database."""
from ..database import DatabaseManager
from .resolvers import get_default_database_path
try:
# Get database manager
db_manager = DatabaseManager(get_default_database_path())
# Delete using the database manager
success = db_manager.delete_schema_file(filename)
if success:
return DeleteSchemaPayload(
success=True,
deleted_filename=filename,
errors=[]
)
else:
return DeleteSchemaPayload(
success=False,
deleted_filename=None,
errors=[f"Failed to delete schema: {filename}"]
)
except Exception as e:
return DeleteSchemaPayload(
success=False,
deleted_filename=None,
errors=[str(e)]
)
# Create the schema with both Query and Mutation
schema = graphene.Schema(query=Query, mutation=Mutation)