Files
tegwick 2a15dde228
Some checks failed
Test Suite / performance-tests (push) Has been cancelled
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
feat: implement GraphQL write interface with mutations (issue #10)
Added comprehensive GraphQL mutations for CRUD operations on markdown files and schemas.

Key features:
- Complete mutation schema with structured payload types
- Markdown file mutations: add, update with front matter support
- Schema mutations: add, update, delete with JSON validation
- CLI integration with `graphql-mutate` command
- Comprehensive error handling and validation
- Full test coverage with 24 test cases
- Updated documentation with mutation examples and API usage

Resolves issue #10: Expose a GraphQL Write Interface

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 16:48:03 +02:00

612 lines
22 KiB
Python

"""
GraphQL schema definition for MarkiTect data.
Defines the complete GraphQL schema for querying Markdown files,
ASTs, schemas, and related metadata.
"""
import graphene
from graphene import ObjectType, String, Int, DateTime, List, Field, JSONString
from typing import Optional
class FrontMatter(ObjectType):
"""GraphQL type for front matter data."""
key = String(required=True, description="Front matter key")
value = JSONString(description="Front matter value (can be any JSON type)")
class MarkdownFile(ObjectType):
"""GraphQL type for markdown files stored in MarkiTect."""
id = Int(required=True, description="Unique identifier")
filename = String(required=True, description="File path/name")
content = String(description="Markdown content")
front_matter = List(FrontMatter, description="Parsed front matter data")
front_matter_raw = JSONString(description="Raw front matter as JSON")
created_at = DateTime(description="Creation timestamp")
# Computed fields
word_count = Int(description="Number of words in content")
line_count = Int(description="Number of lines in content")
has_front_matter = graphene.Boolean(description="Whether file has front matter")
def resolve_front_matter(self, info):
"""Resolve front matter as key-value pairs."""
if self.front_matter_raw:
return [
FrontMatter(key=k, value=v)
for k, v in self.front_matter_raw.items()
]
return []
def resolve_word_count(self, info):
"""Calculate word count."""
if self.content:
return len(self.content.split())
return 0
def resolve_line_count(self, info):
"""Calculate line count."""
if self.content:
return len(self.content.splitlines())
return 0
def resolve_has_front_matter(self, info):
"""Check if file has front matter."""
return bool(self.front_matter_raw)
class Schema(ObjectType):
"""GraphQL type for JSON schemas."""
id = Int(required=True, description="Unique identifier")
filename = String(required=True, description="Schema filename")
title = String(description="Schema title")
description = String(description="Schema description")
schema_content = JSONString(required=True, description="JSON schema content")
created_at = DateTime(description="Creation timestamp")
updated_at = DateTime(description="Last update timestamp")
# Computed fields
schema_version = String(description="JSON Schema version")
property_count = Int(description="Number of properties in schema")
def resolve_schema_version(self, info):
"""Extract schema version."""
if self.schema_content and isinstance(self.schema_content, dict):
return self.schema_content.get('$schema', 'Unknown')
return 'Unknown'
def resolve_property_count(self, info):
"""Count properties in schema."""
if (self.schema_content and
isinstance(self.schema_content, dict) and
'properties' in self.schema_content):
return len(self.schema_content['properties'])
return 0
class ASTNode(ObjectType):
"""GraphQL type for AST nodes."""
type = String(required=True, description="Node type")
value = String(description="Node value/content")
level = Int(description="Heading level (for heading nodes)")
children = List(lambda: ASTNode, description="Child nodes")
attrs = JSONString(description="Node attributes")
class AST(ObjectType):
"""GraphQL type for parsed AST."""
file_id = Int(description="Associated file ID")
filename = String(required=True, description="Source filename")
tree = List(ASTNode, description="AST tree structure")
metadata = JSONString(description="AST metadata")
# Statistics
heading_count = Int(description="Number of headings")
link_count = Int(description="Number of links")
image_count = Int(description="Number of images")
code_block_count = Int(description="Number of code blocks")
class DatabaseStats(ObjectType):
"""Database statistics."""
total_files = Int(description="Total number of markdown files")
total_schemas = Int(description="Total number of schemas")
total_size_bytes = Int(description="Total database size in bytes")
last_updated = DateTime(description="Last database update")
class SearchResult(ObjectType):
"""Search result union type."""
type = String(required=True, description="Result type (file, schema)")
score = graphene.Float(description="Search relevance score")
file = Field(MarkdownFile, description="Matched file (if type=file)")
schema = Field(Schema, description="Matched schema (if type=schema)")
highlight = String(description="Highlighted match text")
class Query(ObjectType):
"""Root GraphQL query type."""
# Single item queries
markdown_file = Field(
MarkdownFile,
id=Int(description="File ID"),
filename=String(description="File path"),
description="Get a specific markdown file"
)
schema = Field(
Schema,
id=Int(description="Schema ID"),
filename=String(description="Schema filename"),
description="Get a specific schema"
)
ast = Field(
AST,
file_id=Int(description="File ID"),
filename=String(description="File path"),
description="Get AST for a specific file"
)
# List queries
markdown_files = List(
MarkdownFile,
limit=Int(default_value=50, description="Maximum number of results"),
offset=Int(default_value=0, description="Offset for pagination"),
has_front_matter=graphene.Boolean(description="Filter by front matter presence"),
created_after=DateTime(description="Filter by creation date"),
description="List markdown files with optional filtering"
)
schemas = List(
Schema,
limit=Int(default_value=50, description="Maximum number of results"),
offset=Int(default_value=0, description="Offset for pagination"),
description="List all schemas"
)
# Search
search = List(
SearchResult,
query=String(required=True, description="Search query"),
type=String(description="Search type filter (file, schema, all)"),
limit=Int(default_value=20, description="Maximum number of results"),
description="Search across files and schemas"
)
# Statistics
database_stats = Field(
DatabaseStats,
description="Get database statistics"
)
# JSONPath queries for ASTs
ast_query = List(
JSONString,
file_id=Int(),
filename=String(),
jsonpath=String(required=True, description="JSONPath expression"),
description="Query AST using JSONPath expressions"
)
# Mutation Types for Write Operations
class AddMarkdownFilePayload(ObjectType):
"""Payload for addMarkdownFile mutation."""
markdown_file = Field(MarkdownFile, description="The created markdown file")
success = graphene.Boolean(description="Whether the operation was successful")
errors = List(String, description="List of validation errors")
class UpdateMarkdownFilePayload(ObjectType):
"""Payload for updateMarkdownFile mutation."""
markdown_file = Field(MarkdownFile, description="The updated markdown file")
success = graphene.Boolean(description="Whether the operation was successful")
errors = List(String, description="List of validation errors")
class AddSchemaPayload(ObjectType):
"""Payload for addSchema mutation."""
schema = Field(Schema, description="The created schema")
success = graphene.Boolean(description="Whether the operation was successful")
errors = List(String, description="List of validation errors")
class UpdateSchemaPayload(ObjectType):
"""Payload for updateSchema mutation."""
schema = Field(Schema, description="The updated schema")
success = graphene.Boolean(description="Whether the operation was successful")
errors = List(String, description="List of validation errors")
class DeleteSchemaPayload(ObjectType):
"""Payload for deleteSchema mutation."""
success = graphene.Boolean(description="Whether the operation was successful")
deleted_filename = String(description="The filename of the deleted schema")
errors = List(String, description="List of validation errors")
class Mutation(ObjectType):
"""Root GraphQL mutation type for write operations."""
# Markdown file mutations
add_markdown_file = Field(
AddMarkdownFilePayload,
filename=String(required=True, description="Filename for the markdown file"),
content=String(required=True, description="Markdown content including front matter"),
description="Add a new markdown file to the database"
)
update_markdown_file = Field(
UpdateMarkdownFilePayload,
id=Int(required=True, description="ID of the markdown file to update"),
content=String(description="New markdown content"),
description="Update an existing markdown file"
)
# Schema mutations
add_schema = Field(
AddSchemaPayload,
filename=String(required=True, description="Filename for the schema"),
schema_content=JSONString(required=True, description="JSON schema content"),
description="Add a new JSON schema to the database"
)
update_schema = Field(
UpdateSchemaPayload,
id=Int(required=True, description="ID of the schema to update"),
schema_content=JSONString(description="New JSON schema content"),
description="Update an existing JSON schema"
)
delete_schema = Field(
DeleteSchemaPayload,
filename=String(required=True, description="Filename of the schema to delete"),
description="Delete a JSON schema from the database"
)
def resolve_add_markdown_file(self, info, filename, content):
"""Add a new markdown file to the database."""
import json
from ..database import DatabaseManager
from .resolvers import get_default_database_path
try:
# Get database manager
db_manager = DatabaseManager(get_default_database_path())
# Store the file using the database manager
file_id = db_manager.store_markdown_file(filename, content)
if file_id:
# Retrieve the created file
import sqlite3
conn = sqlite3.connect(get_default_database_path())
cursor = conn.cursor()
cursor.execute("SELECT * FROM markdown_files WHERE id = ?", (file_id,))
row = cursor.fetchone()
if row:
data = dict(zip([col[0] for col in cursor.description], row))
# Parse front matter JSON
if data['front_matter']:
try:
data['front_matter_raw'] = json.loads(data['front_matter'])
except json.JSONDecodeError:
data['front_matter_raw'] = {}
else:
data['front_matter_raw'] = {}
# Parse datetime strings to datetime objects
from datetime import datetime
if data.get('created_at'):
try:
data['created_at'] = datetime.fromisoformat(data['created_at'])
except (ValueError, TypeError):
data['created_at'] = None
conn.close()
return AddMarkdownFilePayload(
markdown_file=MarkdownFile(**data),
success=True,
errors=[]
)
conn.close()
return AddMarkdownFilePayload(
markdown_file=None,
success=False,
errors=["Failed to store markdown file"]
)
except Exception as e:
return AddMarkdownFilePayload(
markdown_file=None,
success=False,
errors=[str(e)]
)
def resolve_update_markdown_file(self, info, id, content=None):
"""Update an existing markdown file."""
import json
import sqlite3
from ..database import DatabaseManager
from .resolvers import get_default_database_path
try:
if not content:
return UpdateMarkdownFilePayload(
markdown_file=None,
success=False,
errors=["Content is required for update"]
)
db_path = get_default_database_path()
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check if file exists and get filename
cursor.execute("SELECT filename FROM markdown_files WHERE id = ?", (id,))
row = cursor.fetchone()
if not row:
conn.close()
return UpdateMarkdownFilePayload(
markdown_file=None,
success=False,
errors=[f"Markdown file with ID {id} not found"]
)
filename = row[0]
# Parse front matter and content for update
from ..frontmatter import FrontMatterParser
front_matter_parser = FrontMatterParser()
front_matter, markdown_content = front_matter_parser.parse(content)
front_matter_json = json.dumps(front_matter) if front_matter else '{}'
# Update the file directly with SQL
cursor.execute('''
UPDATE markdown_files
SET content = ?, front_matter = ?
WHERE id = ?
''', (markdown_content, front_matter_json, id))
conn.commit()
# Retrieve the updated file
cursor.execute("SELECT * FROM markdown_files WHERE id = ?", (id,))
row = cursor.fetchone()
if row:
data = dict(zip([col[0] for col in cursor.description], row))
# Parse front matter JSON
if data['front_matter']:
try:
data['front_matter_raw'] = json.loads(data['front_matter'])
except json.JSONDecodeError:
data['front_matter_raw'] = {}
else:
data['front_matter_raw'] = {}
# Parse datetime strings to datetime objects
from datetime import datetime
if data.get('created_at'):
try:
data['created_at'] = datetime.fromisoformat(data['created_at'])
except (ValueError, TypeError):
data['created_at'] = None
conn.close()
return UpdateMarkdownFilePayload(
markdown_file=MarkdownFile(**data),
success=True,
errors=[]
)
conn.close()
return UpdateMarkdownFilePayload(
markdown_file=None,
success=False,
errors=["Failed to update markdown file"]
)
except Exception as e:
return UpdateMarkdownFilePayload(
markdown_file=None,
success=False,
errors=[str(e)]
)
def resolve_add_schema(self, info, filename, schema_content):
"""Add a new JSON schema to the database."""
import json
import sqlite3
from ..database import DatabaseManager
from .resolvers import get_default_database_path
try:
# Get database manager
db_manager = DatabaseManager(get_default_database_path())
# Convert schema_content to JSON string if it's a dict
if isinstance(schema_content, dict):
schema_content_str = json.dumps(schema_content)
else:
schema_content_str = schema_content
# Store the schema using the database manager
schema_id = db_manager.store_schema_file(filename, schema_content_str)
if schema_id:
# Retrieve the created schema
conn = sqlite3.connect(get_default_database_path())
cursor = conn.cursor()
cursor.execute("SELECT * FROM schemas WHERE id = ?", (schema_id,))
row = cursor.fetchone()
if row:
data = dict(zip([col[0] for col in cursor.description], row))
# Parse schema content JSON
if data['schema_content']:
try:
data['schema_content'] = json.loads(data['schema_content'])
except json.JSONDecodeError:
data['schema_content'] = {}
# Parse datetime strings to datetime objects
from datetime import datetime
for dt_field in ['created_at', 'updated_at']:
if data.get(dt_field):
try:
data[dt_field] = datetime.fromisoformat(data[dt_field])
except (ValueError, TypeError):
data[dt_field] = None
conn.close()
return AddSchemaPayload(
schema=Schema(**data),
success=True,
errors=[]
)
conn.close()
return AddSchemaPayload(
schema=None,
success=False,
errors=["Failed to store schema"]
)
except Exception as e:
return AddSchemaPayload(
schema=None,
success=False,
errors=[str(e)]
)
def resolve_update_schema(self, info, id, schema_content=None):
"""Update an existing JSON schema."""
import json
import sqlite3
from ..database import DatabaseManager
from .resolvers import get_default_database_path
try:
if not schema_content:
return UpdateSchemaPayload(
schema=None,
success=False,
errors=["Schema content is required for update"]
)
db_path = get_default_database_path()
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check if schema exists
cursor.execute("SELECT filename FROM schemas WHERE id = ?", (id,))
row = cursor.fetchone()
if not row:
conn.close()
return UpdateSchemaPayload(
schema=None,
success=False,
errors=[f"Schema with ID {id} not found"]
)
filename = row[0]
conn.close()
# Convert schema_content to JSON string if it's a dict
if isinstance(schema_content, dict):
schema_content_str = json.dumps(schema_content)
else:
schema_content_str = schema_content
# Update using store_schema_file
db_manager = DatabaseManager(db_path)
schema_id = db_manager.store_schema_file(filename, schema_content_str)
if schema_id:
# Retrieve the updated schema
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM schemas WHERE filename = ?", (filename,))
row = cursor.fetchone()
if row:
data = dict(zip([col[0] for col in cursor.description], row))
# Parse schema content JSON
if data['schema_content']:
try:
data['schema_content'] = json.loads(data['schema_content'])
except json.JSONDecodeError:
data['schema_content'] = {}
# Parse datetime strings to datetime objects
from datetime import datetime
for dt_field in ['created_at', 'updated_at']:
if data.get(dt_field):
try:
data[dt_field] = datetime.fromisoformat(data[dt_field])
except (ValueError, TypeError):
data[dt_field] = None
conn.close()
return UpdateSchemaPayload(
schema=Schema(**data),
success=True,
errors=[]
)
conn.close()
return UpdateSchemaPayload(
schema=None,
success=False,
errors=["Failed to update schema"]
)
except Exception as e:
return UpdateSchemaPayload(
schema=None,
success=False,
errors=[str(e)]
)
def resolve_delete_schema(self, info, filename):
"""Delete a JSON schema from the database."""
from ..database import DatabaseManager
from .resolvers import get_default_database_path
try:
# Get database manager
db_manager = DatabaseManager(get_default_database_path())
# Delete using the database manager
success = db_manager.delete_schema_file(filename)
if success:
return DeleteSchemaPayload(
success=True,
deleted_filename=filename,
errors=[]
)
else:
return DeleteSchemaPayload(
success=False,
deleted_filename=None,
errors=[f"Failed to delete schema: {filename}"]
)
except Exception as e:
return DeleteSchemaPayload(
success=False,
deleted_filename=None,
errors=[str(e)]
)
# Create the schema with both Query and Mutation
schema = graphene.Schema(query=Query, mutation=Mutation)