Some checks failed
Test Suite / performance-tests (push) Has been cancelled
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
Added comprehensive GraphQL mutations for CRUD operations on markdown files and schemas. Key features: - Complete mutation schema with structured payload types - Markdown file mutations: add, update with front matter support - Schema mutations: add, update, delete with JSON validation - CLI integration with `graphql-mutate` command - Comprehensive error handling and validation - Full test coverage with 24 test cases - Updated documentation with mutation examples and API usage Resolves issue #10: Expose a GraphQL Write Interface 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
612 lines
22 KiB
Python
612 lines
22 KiB
Python
"""
|
|
GraphQL schema definition for MarkiTect data.
|
|
|
|
Defines the complete GraphQL schema for querying Markdown files,
|
|
ASTs, schemas, and related metadata.
|
|
"""
|
|
|
|
import graphene
|
|
from graphene import ObjectType, String, Int, DateTime, List, Field, JSONString
|
|
from typing import Optional
|
|
|
|
|
|
class FrontMatter(ObjectType):
|
|
"""GraphQL type for front matter data."""
|
|
key = String(required=True, description="Front matter key")
|
|
value = JSONString(description="Front matter value (can be any JSON type)")
|
|
|
|
|
|
class MarkdownFile(ObjectType):
|
|
"""GraphQL type for markdown files stored in MarkiTect."""
|
|
id = Int(required=True, description="Unique identifier")
|
|
filename = String(required=True, description="File path/name")
|
|
content = String(description="Markdown content")
|
|
front_matter = List(FrontMatter, description="Parsed front matter data")
|
|
front_matter_raw = JSONString(description="Raw front matter as JSON")
|
|
created_at = DateTime(description="Creation timestamp")
|
|
|
|
# Computed fields
|
|
word_count = Int(description="Number of words in content")
|
|
line_count = Int(description="Number of lines in content")
|
|
has_front_matter = graphene.Boolean(description="Whether file has front matter")
|
|
|
|
def resolve_front_matter(self, info):
|
|
"""Resolve front matter as key-value pairs."""
|
|
if self.front_matter_raw:
|
|
return [
|
|
FrontMatter(key=k, value=v)
|
|
for k, v in self.front_matter_raw.items()
|
|
]
|
|
return []
|
|
|
|
def resolve_word_count(self, info):
|
|
"""Calculate word count."""
|
|
if self.content:
|
|
return len(self.content.split())
|
|
return 0
|
|
|
|
def resolve_line_count(self, info):
|
|
"""Calculate line count."""
|
|
if self.content:
|
|
return len(self.content.splitlines())
|
|
return 0
|
|
|
|
def resolve_has_front_matter(self, info):
|
|
"""Check if file has front matter."""
|
|
return bool(self.front_matter_raw)
|
|
|
|
|
|
class Schema(ObjectType):
|
|
"""GraphQL type for JSON schemas."""
|
|
id = Int(required=True, description="Unique identifier")
|
|
filename = String(required=True, description="Schema filename")
|
|
title = String(description="Schema title")
|
|
description = String(description="Schema description")
|
|
schema_content = JSONString(required=True, description="JSON schema content")
|
|
created_at = DateTime(description="Creation timestamp")
|
|
updated_at = DateTime(description="Last update timestamp")
|
|
|
|
# Computed fields
|
|
schema_version = String(description="JSON Schema version")
|
|
property_count = Int(description="Number of properties in schema")
|
|
|
|
def resolve_schema_version(self, info):
|
|
"""Extract schema version."""
|
|
if self.schema_content and isinstance(self.schema_content, dict):
|
|
return self.schema_content.get('$schema', 'Unknown')
|
|
return 'Unknown'
|
|
|
|
def resolve_property_count(self, info):
|
|
"""Count properties in schema."""
|
|
if (self.schema_content and
|
|
isinstance(self.schema_content, dict) and
|
|
'properties' in self.schema_content):
|
|
return len(self.schema_content['properties'])
|
|
return 0
|
|
|
|
|
|
class ASTNode(ObjectType):
|
|
"""GraphQL type for AST nodes."""
|
|
type = String(required=True, description="Node type")
|
|
value = String(description="Node value/content")
|
|
level = Int(description="Heading level (for heading nodes)")
|
|
children = List(lambda: ASTNode, description="Child nodes")
|
|
attrs = JSONString(description="Node attributes")
|
|
|
|
|
|
class AST(ObjectType):
|
|
"""GraphQL type for parsed AST."""
|
|
file_id = Int(description="Associated file ID")
|
|
filename = String(required=True, description="Source filename")
|
|
tree = List(ASTNode, description="AST tree structure")
|
|
metadata = JSONString(description="AST metadata")
|
|
|
|
# Statistics
|
|
heading_count = Int(description="Number of headings")
|
|
link_count = Int(description="Number of links")
|
|
image_count = Int(description="Number of images")
|
|
code_block_count = Int(description="Number of code blocks")
|
|
|
|
|
|
class DatabaseStats(ObjectType):
|
|
"""Database statistics."""
|
|
total_files = Int(description="Total number of markdown files")
|
|
total_schemas = Int(description="Total number of schemas")
|
|
total_size_bytes = Int(description="Total database size in bytes")
|
|
last_updated = DateTime(description="Last database update")
|
|
|
|
|
|
class SearchResult(ObjectType):
|
|
"""Search result union type."""
|
|
type = String(required=True, description="Result type (file, schema)")
|
|
score = graphene.Float(description="Search relevance score")
|
|
file = Field(MarkdownFile, description="Matched file (if type=file)")
|
|
schema = Field(Schema, description="Matched schema (if type=schema)")
|
|
highlight = String(description="Highlighted match text")
|
|
|
|
|
|
class Query(ObjectType):
|
|
"""Root GraphQL query type."""
|
|
|
|
# Single item queries
|
|
markdown_file = Field(
|
|
MarkdownFile,
|
|
id=Int(description="File ID"),
|
|
filename=String(description="File path"),
|
|
description="Get a specific markdown file"
|
|
)
|
|
|
|
schema = Field(
|
|
Schema,
|
|
id=Int(description="Schema ID"),
|
|
filename=String(description="Schema filename"),
|
|
description="Get a specific schema"
|
|
)
|
|
|
|
ast = Field(
|
|
AST,
|
|
file_id=Int(description="File ID"),
|
|
filename=String(description="File path"),
|
|
description="Get AST for a specific file"
|
|
)
|
|
|
|
# List queries
|
|
markdown_files = List(
|
|
MarkdownFile,
|
|
limit=Int(default_value=50, description="Maximum number of results"),
|
|
offset=Int(default_value=0, description="Offset for pagination"),
|
|
has_front_matter=graphene.Boolean(description="Filter by front matter presence"),
|
|
created_after=DateTime(description="Filter by creation date"),
|
|
description="List markdown files with optional filtering"
|
|
)
|
|
|
|
schemas = List(
|
|
Schema,
|
|
limit=Int(default_value=50, description="Maximum number of results"),
|
|
offset=Int(default_value=0, description="Offset for pagination"),
|
|
description="List all schemas"
|
|
)
|
|
|
|
# Search
|
|
search = List(
|
|
SearchResult,
|
|
query=String(required=True, description="Search query"),
|
|
type=String(description="Search type filter (file, schema, all)"),
|
|
limit=Int(default_value=20, description="Maximum number of results"),
|
|
description="Search across files and schemas"
|
|
)
|
|
|
|
# Statistics
|
|
database_stats = Field(
|
|
DatabaseStats,
|
|
description="Get database statistics"
|
|
)
|
|
|
|
# JSONPath queries for ASTs
|
|
ast_query = List(
|
|
JSONString,
|
|
file_id=Int(),
|
|
filename=String(),
|
|
jsonpath=String(required=True, description="JSONPath expression"),
|
|
description="Query AST using JSONPath expressions"
|
|
)
|
|
|
|
|
|
# Mutation Types for Write Operations
|
|
class AddMarkdownFilePayload(ObjectType):
|
|
"""Payload for addMarkdownFile mutation."""
|
|
markdown_file = Field(MarkdownFile, description="The created markdown file")
|
|
success = graphene.Boolean(description="Whether the operation was successful")
|
|
errors = List(String, description="List of validation errors")
|
|
|
|
|
|
class UpdateMarkdownFilePayload(ObjectType):
|
|
"""Payload for updateMarkdownFile mutation."""
|
|
markdown_file = Field(MarkdownFile, description="The updated markdown file")
|
|
success = graphene.Boolean(description="Whether the operation was successful")
|
|
errors = List(String, description="List of validation errors")
|
|
|
|
|
|
class AddSchemaPayload(ObjectType):
|
|
"""Payload for addSchema mutation."""
|
|
schema = Field(Schema, description="The created schema")
|
|
success = graphene.Boolean(description="Whether the operation was successful")
|
|
errors = List(String, description="List of validation errors")
|
|
|
|
|
|
class UpdateSchemaPayload(ObjectType):
|
|
"""Payload for updateSchema mutation."""
|
|
schema = Field(Schema, description="The updated schema")
|
|
success = graphene.Boolean(description="Whether the operation was successful")
|
|
errors = List(String, description="List of validation errors")
|
|
|
|
|
|
class DeleteSchemaPayload(ObjectType):
|
|
"""Payload for deleteSchema mutation."""
|
|
success = graphene.Boolean(description="Whether the operation was successful")
|
|
deleted_filename = String(description="The filename of the deleted schema")
|
|
errors = List(String, description="List of validation errors")
|
|
|
|
|
|
class Mutation(ObjectType):
|
|
"""Root GraphQL mutation type for write operations."""
|
|
|
|
# Markdown file mutations
|
|
add_markdown_file = Field(
|
|
AddMarkdownFilePayload,
|
|
filename=String(required=True, description="Filename for the markdown file"),
|
|
content=String(required=True, description="Markdown content including front matter"),
|
|
description="Add a new markdown file to the database"
|
|
)
|
|
|
|
update_markdown_file = Field(
|
|
UpdateMarkdownFilePayload,
|
|
id=Int(required=True, description="ID of the markdown file to update"),
|
|
content=String(description="New markdown content"),
|
|
description="Update an existing markdown file"
|
|
)
|
|
|
|
# Schema mutations
|
|
add_schema = Field(
|
|
AddSchemaPayload,
|
|
filename=String(required=True, description="Filename for the schema"),
|
|
schema_content=JSONString(required=True, description="JSON schema content"),
|
|
description="Add a new JSON schema to the database"
|
|
)
|
|
|
|
update_schema = Field(
|
|
UpdateSchemaPayload,
|
|
id=Int(required=True, description="ID of the schema to update"),
|
|
schema_content=JSONString(description="New JSON schema content"),
|
|
description="Update an existing JSON schema"
|
|
)
|
|
|
|
delete_schema = Field(
|
|
DeleteSchemaPayload,
|
|
filename=String(required=True, description="Filename of the schema to delete"),
|
|
description="Delete a JSON schema from the database"
|
|
)
|
|
|
|
def resolve_add_markdown_file(self, info, filename, content):
|
|
"""Add a new markdown file to the database."""
|
|
import json
|
|
from ..database import DatabaseManager
|
|
from .resolvers import get_default_database_path
|
|
|
|
try:
|
|
# Get database manager
|
|
db_manager = DatabaseManager(get_default_database_path())
|
|
|
|
# Store the file using the database manager
|
|
file_id = db_manager.store_markdown_file(filename, content)
|
|
|
|
if file_id:
|
|
# Retrieve the created file
|
|
import sqlite3
|
|
conn = sqlite3.connect(get_default_database_path())
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM markdown_files WHERE id = ?", (file_id,))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
data = dict(zip([col[0] for col in cursor.description], row))
|
|
# Parse front matter JSON
|
|
if data['front_matter']:
|
|
try:
|
|
data['front_matter_raw'] = json.loads(data['front_matter'])
|
|
except json.JSONDecodeError:
|
|
data['front_matter_raw'] = {}
|
|
else:
|
|
data['front_matter_raw'] = {}
|
|
|
|
# Parse datetime strings to datetime objects
|
|
from datetime import datetime
|
|
if data.get('created_at'):
|
|
try:
|
|
data['created_at'] = datetime.fromisoformat(data['created_at'])
|
|
except (ValueError, TypeError):
|
|
data['created_at'] = None
|
|
|
|
conn.close()
|
|
return AddMarkdownFilePayload(
|
|
markdown_file=MarkdownFile(**data),
|
|
success=True,
|
|
errors=[]
|
|
)
|
|
conn.close()
|
|
|
|
return AddMarkdownFilePayload(
|
|
markdown_file=None,
|
|
success=False,
|
|
errors=["Failed to store markdown file"]
|
|
)
|
|
|
|
except Exception as e:
|
|
return AddMarkdownFilePayload(
|
|
markdown_file=None,
|
|
success=False,
|
|
errors=[str(e)]
|
|
)
|
|
|
|
def resolve_update_markdown_file(self, info, id, content=None):
|
|
"""Update an existing markdown file."""
|
|
import json
|
|
import sqlite3
|
|
from ..database import DatabaseManager
|
|
from .resolvers import get_default_database_path
|
|
|
|
try:
|
|
if not content:
|
|
return UpdateMarkdownFilePayload(
|
|
markdown_file=None,
|
|
success=False,
|
|
errors=["Content is required for update"]
|
|
)
|
|
|
|
db_path = get_default_database_path()
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Check if file exists and get filename
|
|
cursor.execute("SELECT filename FROM markdown_files WHERE id = ?", (id,))
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
conn.close()
|
|
return UpdateMarkdownFilePayload(
|
|
markdown_file=None,
|
|
success=False,
|
|
errors=[f"Markdown file with ID {id} not found"]
|
|
)
|
|
|
|
filename = row[0]
|
|
|
|
# Parse front matter and content for update
|
|
from ..frontmatter import FrontMatterParser
|
|
front_matter_parser = FrontMatterParser()
|
|
front_matter, markdown_content = front_matter_parser.parse(content)
|
|
front_matter_json = json.dumps(front_matter) if front_matter else '{}'
|
|
|
|
# Update the file directly with SQL
|
|
cursor.execute('''
|
|
UPDATE markdown_files
|
|
SET content = ?, front_matter = ?
|
|
WHERE id = ?
|
|
''', (markdown_content, front_matter_json, id))
|
|
|
|
conn.commit()
|
|
|
|
# Retrieve the updated file
|
|
cursor.execute("SELECT * FROM markdown_files WHERE id = ?", (id,))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
data = dict(zip([col[0] for col in cursor.description], row))
|
|
# Parse front matter JSON
|
|
if data['front_matter']:
|
|
try:
|
|
data['front_matter_raw'] = json.loads(data['front_matter'])
|
|
except json.JSONDecodeError:
|
|
data['front_matter_raw'] = {}
|
|
else:
|
|
data['front_matter_raw'] = {}
|
|
|
|
# Parse datetime strings to datetime objects
|
|
from datetime import datetime
|
|
if data.get('created_at'):
|
|
try:
|
|
data['created_at'] = datetime.fromisoformat(data['created_at'])
|
|
except (ValueError, TypeError):
|
|
data['created_at'] = None
|
|
|
|
conn.close()
|
|
return UpdateMarkdownFilePayload(
|
|
markdown_file=MarkdownFile(**data),
|
|
success=True,
|
|
errors=[]
|
|
)
|
|
|
|
conn.close()
|
|
|
|
return UpdateMarkdownFilePayload(
|
|
markdown_file=None,
|
|
success=False,
|
|
errors=["Failed to update markdown file"]
|
|
)
|
|
|
|
except Exception as e:
|
|
return UpdateMarkdownFilePayload(
|
|
markdown_file=None,
|
|
success=False,
|
|
errors=[str(e)]
|
|
)
|
|
|
|
def resolve_add_schema(self, info, filename, schema_content):
|
|
"""Add a new JSON schema to the database."""
|
|
import json
|
|
import sqlite3
|
|
from ..database import DatabaseManager
|
|
from .resolvers import get_default_database_path
|
|
|
|
try:
|
|
# Get database manager
|
|
db_manager = DatabaseManager(get_default_database_path())
|
|
|
|
# Convert schema_content to JSON string if it's a dict
|
|
if isinstance(schema_content, dict):
|
|
schema_content_str = json.dumps(schema_content)
|
|
else:
|
|
schema_content_str = schema_content
|
|
|
|
# Store the schema using the database manager
|
|
schema_id = db_manager.store_schema_file(filename, schema_content_str)
|
|
|
|
if schema_id:
|
|
# Retrieve the created schema
|
|
conn = sqlite3.connect(get_default_database_path())
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM schemas WHERE id = ?", (schema_id,))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
data = dict(zip([col[0] for col in cursor.description], row))
|
|
# Parse schema content JSON
|
|
if data['schema_content']:
|
|
try:
|
|
data['schema_content'] = json.loads(data['schema_content'])
|
|
except json.JSONDecodeError:
|
|
data['schema_content'] = {}
|
|
|
|
# Parse datetime strings to datetime objects
|
|
from datetime import datetime
|
|
for dt_field in ['created_at', 'updated_at']:
|
|
if data.get(dt_field):
|
|
try:
|
|
data[dt_field] = datetime.fromisoformat(data[dt_field])
|
|
except (ValueError, TypeError):
|
|
data[dt_field] = None
|
|
|
|
conn.close()
|
|
return AddSchemaPayload(
|
|
schema=Schema(**data),
|
|
success=True,
|
|
errors=[]
|
|
)
|
|
conn.close()
|
|
|
|
return AddSchemaPayload(
|
|
schema=None,
|
|
success=False,
|
|
errors=["Failed to store schema"]
|
|
)
|
|
|
|
except Exception as e:
|
|
return AddSchemaPayload(
|
|
schema=None,
|
|
success=False,
|
|
errors=[str(e)]
|
|
)
|
|
|
|
def resolve_update_schema(self, info, id, schema_content=None):
|
|
"""Update an existing JSON schema."""
|
|
import json
|
|
import sqlite3
|
|
from ..database import DatabaseManager
|
|
from .resolvers import get_default_database_path
|
|
|
|
try:
|
|
if not schema_content:
|
|
return UpdateSchemaPayload(
|
|
schema=None,
|
|
success=False,
|
|
errors=["Schema content is required for update"]
|
|
)
|
|
|
|
db_path = get_default_database_path()
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Check if schema exists
|
|
cursor.execute("SELECT filename FROM schemas WHERE id = ?", (id,))
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
conn.close()
|
|
return UpdateSchemaPayload(
|
|
schema=None,
|
|
success=False,
|
|
errors=[f"Schema with ID {id} not found"]
|
|
)
|
|
|
|
filename = row[0]
|
|
conn.close()
|
|
|
|
# Convert schema_content to JSON string if it's a dict
|
|
if isinstance(schema_content, dict):
|
|
schema_content_str = json.dumps(schema_content)
|
|
else:
|
|
schema_content_str = schema_content
|
|
|
|
# Update using store_schema_file
|
|
db_manager = DatabaseManager(db_path)
|
|
schema_id = db_manager.store_schema_file(filename, schema_content_str)
|
|
|
|
if schema_id:
|
|
# Retrieve the updated schema
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM schemas WHERE filename = ?", (filename,))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
data = dict(zip([col[0] for col in cursor.description], row))
|
|
# Parse schema content JSON
|
|
if data['schema_content']:
|
|
try:
|
|
data['schema_content'] = json.loads(data['schema_content'])
|
|
except json.JSONDecodeError:
|
|
data['schema_content'] = {}
|
|
|
|
# Parse datetime strings to datetime objects
|
|
from datetime import datetime
|
|
for dt_field in ['created_at', 'updated_at']:
|
|
if data.get(dt_field):
|
|
try:
|
|
data[dt_field] = datetime.fromisoformat(data[dt_field])
|
|
except (ValueError, TypeError):
|
|
data[dt_field] = None
|
|
|
|
conn.close()
|
|
return UpdateSchemaPayload(
|
|
schema=Schema(**data),
|
|
success=True,
|
|
errors=[]
|
|
)
|
|
conn.close()
|
|
|
|
return UpdateSchemaPayload(
|
|
schema=None,
|
|
success=False,
|
|
errors=["Failed to update schema"]
|
|
)
|
|
|
|
except Exception as e:
|
|
return UpdateSchemaPayload(
|
|
schema=None,
|
|
success=False,
|
|
errors=[str(e)]
|
|
)
|
|
|
|
def resolve_delete_schema(self, info, filename):
|
|
"""Delete a JSON schema from the database."""
|
|
from ..database import DatabaseManager
|
|
from .resolvers import get_default_database_path
|
|
|
|
try:
|
|
# Get database manager
|
|
db_manager = DatabaseManager(get_default_database_path())
|
|
|
|
# Delete using the database manager
|
|
success = db_manager.delete_schema_file(filename)
|
|
|
|
if success:
|
|
return DeleteSchemaPayload(
|
|
success=True,
|
|
deleted_filename=filename,
|
|
errors=[]
|
|
)
|
|
else:
|
|
return DeleteSchemaPayload(
|
|
success=False,
|
|
deleted_filename=None,
|
|
errors=[f"Failed to delete schema: {filename}"]
|
|
)
|
|
|
|
except Exception as e:
|
|
return DeleteSchemaPayload(
|
|
success=False,
|
|
deleted_filename=None,
|
|
errors=[str(e)]
|
|
)
|
|
|
|
|
|
# Create the schema with both Query and Mutation
|
|
schema = graphene.Schema(query=Query, mutation=Mutation) |