Files
markitect-main/markitect/graphql/resolvers.py
tegwick 2dd1704e51 feat: implement comprehensive GraphQL read interface (issue #9)
Adds a complete GraphQL API for querying MarkiTect database content including:

CORE FEATURES:
- Type-safe GraphQL schema with comprehensive field definitions
- Full database access: markdown files, schemas, ASTs, and metadata
- Advanced search capabilities with relevance scoring
- Pagination support for efficient data access
- Real-time schema introspection and development tools

IMPLEMENTATION:
- GraphQL schema definition with 6 core types (MarkdownFile, Schema, AST, etc.)
- Complete resolver implementation with database integration
- Flask-based GraphQL server with CORS support
- GraphQL Playground for interactive development
- Health check and schema introspection endpoints

CLI INTEGRATION:
- graphql-serve: Start GraphQL server with customizable options
- graphql-query: Execute queries from command line (local/remote)
- graphql-schema: Retrieve schema definition in SDL/JSON format
- graphql-examples: Comprehensive usage examples and documentation

API FEATURES:
- Single item queries (by ID or filename)
- List queries with filtering and pagination
- Full-text search across files and schemas
- Database statistics and analytics
- AST querying with JSONPath expressions
- Computed fields (word count, line count, etc.)

TESTING:
- Comprehensive test suite with 38 passing tests
- Unit tests for schema, resolvers, server, and client
- Integration tests for query execution
- Error handling and edge case coverage
- Mock and fixture support for isolated testing

DOCUMENTATION:
- Complete API documentation with examples
- Usage guide for all CLI commands
- Programming examples in Python and JavaScript
- Performance optimization guidelines
- Troubleshooting and security considerations

The GraphQL interface enables developers to build rich applications on top of
MarkiTect data with flexible, efficient querying capabilities.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 11:53:53 +02:00

449 lines
14 KiB
Python

"""
GraphQL resolvers for MarkiTect data.
Implements the resolver functions that fetch data from MarkiTect's
database and services to fulfill GraphQL queries.
"""
import json
import sqlite3
import os
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Dict, Any, Union
from jsonpath_ng import parse as jsonpath_parse
from ..database import DatabaseManager
from ..ast_service import ASTService
from .schema import (
MarkdownFile, Schema, AST, ASTNode, DatabaseStats,
SearchResult, Query as QueryType
)
class MarkiTectResolver:
"""Base resolver class with common database operations."""
def __init__(self, db_path: str):
"""Initialize resolver with database path."""
self.db_path = db_path
self.db_manager = DatabaseManager(db_path)
self.ast_service = ASTService()
def get_connection(self):
"""Get database connection."""
return sqlite3.connect(self.db_path)
def row_to_dict(self, cursor, row):
"""Convert database row to dictionary."""
return dict(zip([col[0] for col in cursor.description], row))
class Query(QueryType):
"""GraphQL query resolver implementation."""
def __init__(self):
"""Initialize query resolver."""
# Default database path - could be made configurable
self.resolver = MarkiTectResolver(get_default_database_path())
def resolve_markdown_file(self, info, id=None, filename=None):
"""Resolve single markdown file query."""
conn = self.resolver.get_connection()
cursor = conn.cursor()
if id:
cursor.execute(
"SELECT * FROM markdown_files WHERE id = ?",
(id,)
)
elif filename:
cursor.execute(
"SELECT * FROM markdown_files WHERE filename = ?",
(filename,)
)
else:
return None
row = cursor.fetchone()
conn.close()
if row:
data = self.resolver.row_to_dict(cursor, row)
# Parse front matter JSON
if data['front_matter']:
try:
data['front_matter_raw'] = json.loads(data['front_matter'])
except json.JSONDecodeError:
data['front_matter_raw'] = {}
else:
data['front_matter_raw'] = {}
return MarkdownFile(**data)
return None
def resolve_schema(self, info, id=None, filename=None):
"""Resolve single schema query."""
conn = self.resolver.get_connection()
cursor = conn.cursor()
if id:
cursor.execute(
"SELECT * FROM schemas WHERE id = ?",
(id,)
)
elif filename:
cursor.execute(
"SELECT * FROM schemas WHERE filename = ?",
(filename,)
)
else:
return None
row = cursor.fetchone()
conn.close()
if row:
data = self.resolver.row_to_dict(cursor, row)
# Parse schema content JSON
if data['schema_content']:
try:
data['schema_content'] = json.loads(data['schema_content'])
except json.JSONDecodeError:
data['schema_content'] = {}
return Schema(**data)
return None
def resolve_ast(self, info, file_id=None, filename=None):
"""Resolve AST query."""
if not file_id and not filename:
return None
# Get file path
if file_id:
conn = self.resolver.get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT filename FROM markdown_files WHERE id = ?",
(file_id,)
)
row = cursor.fetchone()
conn.close()
if not row:
return None
filename = row[0]
if not filename:
return None
file_path = Path(filename)
try:
# Use AST service to get parsed AST
ast_result = self.resolver.ast_service.display_ast(file_path, "json")
if ast_result.get('success'):
ast_data = ast_result.get('ast', {})
# Convert to our GraphQL AST format
return AST(
file_id=file_id,
filename=filename,
tree=self._convert_ast_nodes(ast_data),
metadata=ast_result.get('metadata', {}),
heading_count=self._count_nodes_by_type(ast_data, 'heading'),
link_count=self._count_nodes_by_type(ast_data, 'link'),
image_count=self._count_nodes_by_type(ast_data, 'image'),
code_block_count=self._count_nodes_by_type(ast_data, 'code')
)
except Exception:
pass
return None
def resolve_markdown_files(self, info, limit=50, offset=0, has_front_matter=None, created_after=None):
"""Resolve markdown files list query."""
conn = self.resolver.get_connection()
cursor = conn.cursor()
# Build query with filters
query = "SELECT * FROM markdown_files WHERE 1=1"
params = []
if has_front_matter is not None:
if has_front_matter:
query += " AND front_matter IS NOT NULL AND front_matter != ''"
else:
query += " AND (front_matter IS NULL OR front_matter = '')"
if created_after:
query += " AND created_at > ?"
params.append(created_after.isoformat())
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
rows = cursor.fetchall()
conn.close()
files = []
for row in rows:
data = self.resolver.row_to_dict(cursor, row)
# Parse front matter JSON
if data['front_matter']:
try:
data['front_matter_raw'] = json.loads(data['front_matter'])
except json.JSONDecodeError:
data['front_matter_raw'] = {}
else:
data['front_matter_raw'] = {}
files.append(MarkdownFile(**data))
return files
def resolve_schemas(self, info, limit=50, offset=0):
"""Resolve schemas list query."""
conn = self.resolver.get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM schemas ORDER BY created_at DESC LIMIT ? OFFSET ?",
(limit, offset)
)
rows = cursor.fetchall()
conn.close()
schemas = []
for row in rows:
data = self.resolver.row_to_dict(cursor, row)
# Parse schema content JSON
if data['schema_content']:
try:
data['schema_content'] = json.loads(data['schema_content'])
except json.JSONDecodeError:
data['schema_content'] = {}
schemas.append(Schema(**data))
return schemas
def resolve_search(self, info, query, type="all", limit=20):
"""Resolve search query."""
results = []
conn = self.resolver.get_connection()
cursor = conn.cursor()
# Search in markdown files
if type in ["all", "file"]:
cursor.execute("""
SELECT *, 'file' as result_type FROM markdown_files
WHERE filename LIKE ? OR content LIKE ?
ORDER BY
CASE WHEN filename LIKE ? THEN 1 ELSE 2 END,
created_at DESC
LIMIT ?
""", (f"%{query}%", f"%{query}%", f"%{query}%", limit))
for row in cursor.fetchall():
data = self.resolver.row_to_dict(cursor, row)
if data['front_matter']:
try:
data['front_matter_raw'] = json.loads(data['front_matter'])
except json.JSONDecodeError:
data['front_matter_raw'] = {}
else:
data['front_matter_raw'] = {}
# Remove extra fields that don't belong to MarkdownFile
file_data = {k: v for k, v in data.items() if k != 'result_type'}
# Calculate basic relevance score
score = 1.0
if query.lower() in data['filename'].lower():
score += 0.5
if data['content'] and query.lower() in data['content'].lower():
score += 0.3
results.append(SearchResult(
type="file",
score=score,
file=MarkdownFile(**file_data),
highlight=self._extract_highlight(data.get('content', ''), query)
))
# Search in schemas
if type in ["all", "schema"]:
cursor.execute("""
SELECT *, 'schema' as result_type FROM schemas
WHERE filename LIKE ? OR title LIKE ? OR description LIKE ?
ORDER BY created_at DESC
LIMIT ?
""", (f"%{query}%", f"%{query}%", f"%{query}%", limit))
for row in cursor.fetchall():
data = self.resolver.row_to_dict(cursor, row)
if data['schema_content']:
try:
data['schema_content'] = json.loads(data['schema_content'])
except json.JSONDecodeError:
data['schema_content'] = {}
# Remove extra fields that don't belong to Schema
schema_data = {k: v for k, v in data.items() if k != 'result_type'}
# Calculate basic relevance score
score = 1.0
if query.lower() in data.get('title', '').lower():
score += 0.5
results.append(SearchResult(
type="schema",
score=score,
schema=Schema(**schema_data),
highlight=data.get('title', '') or data.get('filename', '')
))
conn.close()
# Sort by score and limit
results.sort(key=lambda x: x.score, reverse=True)
return results[:limit]
def resolve_database_stats(self, info):
"""Resolve database statistics."""
conn = self.resolver.get_connection()
cursor = conn.cursor()
# Count files
cursor.execute("SELECT COUNT(*) FROM markdown_files")
total_files = cursor.fetchone()[0]
# Count schemas
cursor.execute("SELECT COUNT(*) FROM schemas")
total_schemas = cursor.fetchone()[0]
# Get database size
db_size = 0
if os.path.exists(self.resolver.db_path):
db_size = os.path.getsize(self.resolver.db_path)
# Get last update time
cursor.execute("""
SELECT MAX(created_at) FROM (
SELECT created_at FROM markdown_files
UNION ALL
SELECT created_at FROM schemas
)
""")
last_updated_str = cursor.fetchone()[0]
last_updated = None
if last_updated_str:
try:
last_updated = datetime.fromisoformat(last_updated_str)
except ValueError:
pass
conn.close()
return DatabaseStats(
total_files=total_files,
total_schemas=total_schemas,
total_size_bytes=db_size,
last_updated=last_updated
)
def resolve_ast_query(self, info, jsonpath, file_id=None, filename=None):
"""Resolve JSONPath query on AST."""
if not file_id and not filename:
return []
# Get AST data
ast = self.resolve_ast(info, file_id=file_id, filename=filename)
if not ast or not ast.metadata:
return []
try:
# Parse JSONPath expression
jsonpath_expr = jsonpath_parse(jsonpath)
# Apply to AST metadata (contains the raw AST)
matches = jsonpath_expr.find(ast.metadata)
# Return the matched values
return [match.value for match in matches]
except Exception:
return []
def _convert_ast_nodes(self, ast_data):
"""Convert AST data to GraphQL ASTNode format."""
if not ast_data or not isinstance(ast_data, dict):
return []
nodes = []
if 'children' in ast_data:
for child in ast_data['children']:
node = ASTNode(
type=child.get('type', 'unknown'),
value=child.get('value'),
level=child.get('depth'),
attrs=child,
children=self._convert_ast_nodes(child) if 'children' in child else []
)
nodes.append(node)
return nodes
def _count_nodes_by_type(self, ast_data, node_type):
"""Count nodes of specific type in AST."""
if not ast_data or not isinstance(ast_data, dict):
return 0
count = 0
if ast_data.get('type') == node_type:
count += 1
if 'children' in ast_data:
for child in ast_data['children']:
count += self._count_nodes_by_type(child, node_type)
return count
def _extract_highlight(self, content, query, context_length=100):
"""Extract highlighted snippet from content."""
if not content or not query:
return ""
query_lower = query.lower()
content_lower = content.lower()
index = content_lower.find(query_lower)
if index == -1:
return content[:context_length] + "..." if len(content) > context_length else content
start = max(0, index - context_length // 2)
end = min(len(content), index + len(query) + context_length // 2)
snippet = content[start:end]
if start > 0:
snippet = "..." + snippet
if end < len(content):
snippet = snippet + "..."
return snippet
def get_default_database_path():
"""Get default database path for GraphQL resolvers."""
import os
from pathlib import Path
# Use the same logic as CLI
if 'MARKITECT_DB' in os.environ:
return os.environ['MARKITECT_DB']
config_dir = Path.home() / '.markitect'
config_dir.mkdir(exist_ok=True)
return str(config_dir / 'markitect.db')