feat: implement comprehensive query paradigm zoo system (issue #62)

- Created extensible BaseQueryParadigm interface with standardized QueryResult format
- Implemented QueryParadigmRegistry for paradigm discovery and management
- Added 5 working paradigms: SQL, FTS, GraphQL, JSONPath, Natural Language
- Documented 9 additional paradigms: QBE, Batch Manipulation, Visual Query Builder, REST API, NoSQL, UNIX Pipeline, XPath/XQuery, RAG, Data Transformation
- Integrated full CLI interface: list, search, show, exec, categories commands
- Added comprehensive test suite with 23 test cases covering all components
- Auto-registration system enables easy addition of new paradigms
- Organized paradigms by category (structural, textual, semantic, visual, procedural, network) and complexity (beginner, intermediate, advanced)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-03 23:06:57 +02:00
parent 1d13cbb355
commit 5143864a86
21 changed files with 3659 additions and 0 deletions

View File

@@ -6364,6 +6364,191 @@ cli.add_command(wishlist_group)
# Register issue management commands
cli.add_command(issues_group)
# Query Paradigm Commands - Issue #62
@click.group()
def paradigms():
"""Discover and explore different query paradigms in MarkiTect."""
pass
@paradigms.command()
@pass_config
def list(config):
"""List all available query paradigms."""
from .query_paradigms.registry import registry
paradigms = registry.list_all()
print(f"📚 MarkiTect Query Paradigms ({len(paradigms)} available)")
print("=" * 50)
# Group by category
categories = {}
for paradigm in paradigms:
if paradigm.category not in categories:
categories[paradigm.category] = []
categories[paradigm.category].append(paradigm)
for category, paradigm_list in categories.items():
print(f"\n🏷️ {category.upper()} PARADIGMS")
print("-" * 30)
for paradigm in paradigm_list:
status = "✅ IMPLEMENTED" if paradigm.name in ['SQL', 'FTS', 'GraphQL', 'JSONPath', 'Natural Language'] else "📋 DOCUMENTED"
print(f" {status} {paradigm.name} ({paradigm.complexity})")
print(f" {paradigm.description}")
print()
@paradigms.command()
@click.argument('query')
@pass_config
def search(config, query):
"""Search paradigms by name or description."""
from .query_paradigms.registry import registry
results = registry.search_paradigms(query)
if not results:
print(f"❌ No paradigms found matching '{query}'")
return
print(f"🔍 Search results for '{query}' ({len(results)} found)")
print("=" * 40)
for paradigm in results:
status = "✅ IMPLEMENTED" if paradigm.name in ['SQL', 'FTS', 'GraphQL', 'JSONPath', 'Natural Language'] else "📋 DOCUMENTED"
print(f" {status} {paradigm.name} ({paradigm.category}, {paradigm.complexity})")
print(f" {paradigm.description}")
print()
@paradigms.command()
@click.argument('name')
@pass_config
def show(config, name):
"""Show detailed information about a specific paradigm."""
from .query_paradigms.registry import registry
paradigm = registry.get(name)
if not paradigm:
print(f"❌ Paradigm '{name}' not found.")
print("\nAvailable paradigms:")
for p in registry.list_all():
print(f" - {p.name}")
return
status = "✅ IMPLEMENTED" if paradigm.name in ['SQL', 'FTS', 'GraphQL', 'JSONPath', 'Natural Language'] else "📋 DOCUMENTED"
print(f"🔍 {paradigm.name} Query Paradigm")
print("=" * (len(paradigm.name) + 20))
print(f"Status: {status}")
print(f"Category: {paradigm.category}")
print(f"Complexity: {paradigm.complexity}")
print(f"Description: {paradigm.description}")
print()
print("📝 Syntax Help:")
print("-" * 15)
print(paradigm.get_syntax_help())
print()
print("💡 Examples:")
print("-" * 12)
examples = paradigm.get_examples()
for i, example in enumerate(examples, 1):
print(f"{i}. {example['name']}")
print(f" {example['description']}")
print(f" Query: {example['query']}")
print()
@paradigms.command()
@click.argument('paradigm_name')
@click.argument('query')
@click.option('--config-data', type=str, help='JSON configuration for the query')
@pass_config
def exec(config, paradigm_name, query, config_data):
"""Execute a query using specified paradigm."""
from .query_paradigms.registry import registry
import json
paradigm = registry.get(paradigm_name)
if not paradigm:
print(f"❌ Paradigm '{paradigm_name}' not found.")
return
# Parse config if provided
query_config = {}
if config_data:
try:
query_config = json.loads(config_data)
except json.JSONDecodeError:
print("❌ Invalid JSON in config-data parameter")
return
# Add database path from global config
db_path = get_database_path(config)
query_config['db_path'] = db_path
# Validate query first
valid, error = paradigm.validate_query(query)
if not valid:
print(f"❌ Invalid query: {error}")
return
print(f"🚀 Executing {paradigm.name} query...")
print(f"Query: {query}")
if config_data:
print(f"Config: {query_config}")
print()
try:
result = paradigm.execute(query, query_config)
print(f"⏱️ Execution time: {result.execution_time_ms:.2f}ms")
print(f"📊 Result count: {result.result_count}")
print(f"✅ Success: {result.success}")
if result.error_message:
print(f"❌ Error: {result.error_message}")
if result.metadata:
print("\n📋 Metadata:")
for key, value in result.metadata.items():
print(f" {key}: {value}")
if result.results:
print(f"\n📄 Results:")
for i, row in enumerate(result.results[:5], 1): # Show first 5 results
print(f" {i}. {row}")
if len(result.results) > 5:
print(f" ... and {len(result.results) - 5} more results")
except Exception as e:
print(f"❌ Execution error: {e}")
@paradigms.command()
@pass_config
def categories(config):
"""List all available paradigm categories."""
from .query_paradigms.registry import registry
categories = registry.get_categories()
print("📂 Available Categories:")
for category in sorted(categories):
paradigms = registry.list_by_category(category)
print(f" {category}: {len(paradigms)} paradigms")
# Register paradigms commands
cli.add_command(paradigms)
# Make cli function available as main entry point
main = cli

View File

@@ -0,0 +1,17 @@
"""
Query Paradigms - A unified interface to explore different query approaches in MarkiTect.
This module provides a "zoo" of query paradigms that demonstrates various ways
to query and interact with MarkiTect data, helping users discover the most
appropriate approach for their needs.
"""
from .registry import QueryParadigmRegistry
from .base import BaseQueryParadigm, QueryResult
from .paradigms import *
__all__ = [
'QueryParadigmRegistry',
'BaseQueryParadigm',
'QueryResult'
]

View File

@@ -0,0 +1,77 @@
"""
Base classes for query paradigms.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass
from datetime import datetime
@dataclass
class QueryResult:
"""Standardized result format for all query paradigms."""
paradigm: str
query: str
execution_time_ms: float
result_count: int
results: List[Dict[str, Any]]
metadata: Dict[str, Any]
success: bool
error_message: Optional[str] = None
class BaseQueryParadigm(ABC):
"""Base class for all query paradigms."""
@property
@abstractmethod
def name(self) -> str:
"""Human-readable name of the paradigm."""
pass
@property
@abstractmethod
def description(self) -> str:
"""Description of what this paradigm does."""
pass
@property
@abstractmethod
def category(self) -> str:
"""Category: structural, textual, semantic, procedural."""
pass
@property
@abstractmethod
def complexity(self) -> str:
"""Complexity level: beginner, intermediate, advanced."""
pass
@abstractmethod
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute a query using this paradigm."""
pass
@abstractmethod
def get_examples(self) -> List[Dict[str, str]]:
"""Get example queries for this paradigm."""
pass
@abstractmethod
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate if a query is valid for this paradigm."""
pass
def get_syntax_help(self) -> str:
"""Get syntax help for this paradigm."""
return f"{self.name} syntax help not yet implemented."
def can_translate_from(self, other_paradigm: str) -> bool:
"""Check if this paradigm can translate queries from another."""
return False
def translate_query(self, query: str, from_paradigm: str) -> Optional[str]:
"""Translate a query from another paradigm to this one."""
return None

View File

@@ -0,0 +1,242 @@
"""
CLI interface for query paradigm discovery and interaction.
"""
import argparse
import json
import sys
from typing import Dict, Any
from .registry import registry
def list_paradigms():
"""List all available query paradigms."""
paradigms = registry.list_all()
print(f"📚 MarkiTect Query Paradigms ({len(paradigms)} available)")
print("=" * 50)
# Group by category
categories = {}
for paradigm in paradigms:
if paradigm.category not in categories:
categories[paradigm.category] = []
categories[paradigm.category].append(paradigm)
for category, paradigm_list in categories.items():
print(f"\n🏷️ {category.upper()} PARADIGMS")
print("-" * 30)
for paradigm in paradigm_list:
status = "✅ IMPLEMENTED" if paradigm.name in ['SQL', 'FTS', 'GraphQL', 'JSONPath', 'Natural Language'] else "📋 DOCUMENTED"
print(f" {status} {paradigm.name} ({paradigm.complexity})")
print(f" {paradigm.description}")
print()
def show_paradigm_details(name: str):
"""Show detailed information about a specific paradigm."""
paradigm = registry.get(name)
if not paradigm:
print(f"❌ Paradigm '{name}' not found.")
print("\nAvailable paradigms:")
for p in registry.list_all():
print(f" - {p.name}")
return
status = "✅ IMPLEMENTED" if paradigm.name in ['SQL', 'FTS', 'GraphQL', 'JSONPath', 'Natural Language'] else "📋 DOCUMENTED"
print(f"🔍 {paradigm.name} Query Paradigm")
print("=" * (len(paradigm.name) + 20))
print(f"Status: {status}")
print(f"Category: {paradigm.category}")
print(f"Complexity: {paradigm.complexity}")
print(f"Description: {paradigm.description}")
print()
print("📝 Syntax Help:")
print("-" * 15)
print(paradigm.get_syntax_help())
print()
print("💡 Examples:")
print("-" * 12)
examples = paradigm.get_examples()
for i, example in enumerate(examples, 1):
print(f"{i}. {example['name']}")
print(f" {example['description']}")
print(f" Query: {example['query']}")
print()
def search_paradigms(query: str):
"""Search paradigms by name or description."""
results = registry.search_paradigms(query)
if not results:
print(f"❌ No paradigms found matching '{query}'")
return
print(f"🔍 Search results for '{query}' ({len(results)} found)")
print("=" * 40)
for paradigm in results:
status = "✅ IMPLEMENTED" if paradigm.name in ['SQL', 'FTS', 'GraphQL', 'JSONPath', 'Natural Language'] else "📋 DOCUMENTED"
print(f" {status} {paradigm.name} ({paradigm.category}, {paradigm.complexity})")
print(f" {paradigm.description}")
print()
def execute_query(paradigm_name: str, query: str, config_str: str = None):
"""Execute a query using specified paradigm."""
paradigm = registry.get(paradigm_name)
if not paradigm:
print(f"❌ Paradigm '{paradigm_name}' not found.")
return
# Parse config if provided
config = {}
if config_str:
try:
config = json.loads(config_str)
except json.JSONDecodeError:
print("❌ Invalid JSON in config parameter")
return
# Validate query first
valid, error = paradigm.validate_query(query)
if not valid:
print(f"❌ Invalid query: {error}")
return
print(f"🚀 Executing {paradigm.name} query...")
print(f"Query: {query}")
if config:
print(f"Config: {config}")
print()
try:
result = paradigm.execute(query, config)
print(f"⏱️ Execution time: {result.execution_time_ms:.2f}ms")
print(f"📊 Result count: {result.result_count}")
print(f"✅ Success: {result.success}")
if result.error_message:
print(f"❌ Error: {result.error_message}")
if result.metadata:
print("\n📋 Metadata:")
for key, value in result.metadata.items():
print(f" {key}: {value}")
if result.results:
print(f"\n📄 Results:")
for i, row in enumerate(result.results[:5], 1): # Show first 5 results
print(f" {i}. {row}")
if len(result.results) > 5:
print(f" ... and {len(result.results) - 5} more results")
except Exception as e:
print(f"❌ Execution error: {e}")
def list_categories():
"""List all available categories."""
categories = registry.get_categories()
print("📂 Available Categories:")
for category in sorted(categories):
paradigms = registry.list_by_category(category)
print(f" {category}: {len(paradigms)} paradigms")
def show_translation_matrix():
"""Show paradigm translation capabilities."""
matrix = registry.get_translation_matrix()
print("🔄 Paradigm Translation Matrix")
print("=" * 30)
print("(Which paradigms can translate to which others)")
print()
for source, targets in matrix.items():
if targets:
print(f"{source}{', '.join(targets)}")
else:
print(f"{source} → (no translations available)")
def main():
"""Main CLI entry point."""
parser = argparse.ArgumentParser(
description="MarkiTect Query Paradigm Explorer",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s list # List all paradigms
%(prog)s search "semantic" # Search paradigms
%(prog)s show "Natural Language" # Show paradigm details
%(prog)s exec FTS "documentation" # Execute query
%(prog)s categories # List categories
%(prog)s translations # Show translation matrix
"""
)
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# List command
subparsers.add_parser('list', help='List all available paradigms')
# Search command
search_parser = subparsers.add_parser('search', help='Search paradigms')
search_parser.add_argument('query', help='Search query')
# Show command
show_parser = subparsers.add_parser('show', help='Show paradigm details')
show_parser.add_argument('name', help='Paradigm name')
# Execute command
exec_parser = subparsers.add_parser('exec', help='Execute query')
exec_parser.add_argument('paradigm', help='Paradigm name')
exec_parser.add_argument('query', help='Query to execute')
exec_parser.add_argument('--config', help='JSON configuration')
# Categories command
subparsers.add_parser('categories', help='List categories')
# Translations command
subparsers.add_parser('translations', help='Show translation matrix')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
try:
if args.command == 'list':
list_paradigms()
elif args.command == 'search':
search_paradigms(args.query)
elif args.command == 'show':
show_paradigm_details(args.name)
elif args.command == 'exec':
execute_query(args.paradigm, args.query, args.config)
elif args.command == 'categories':
list_categories()
elif args.command == 'translations':
show_translation_matrix()
except KeyboardInterrupt:
print("\n👋 Goodbye!")
sys.exit(0)
except Exception as e:
print(f"❌ Error: {e}")
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,61 @@
"""
Implementations of various query paradigms.
"""
from .sql_paradigm import SQLQueryParadigm
from .fts_paradigm import FullTextSearchParadigm
from .graphql_paradigm import GraphQLQueryParadigm
from .jsonpath_paradigm import JSONPathQueryParadigm
from .natural_language_paradigm import NaturalLanguageQueryParadigm
from .qbe_paradigm import QueryByExampleParadigm
from .batch_paradigm import BatchManipulationParadigm
from .visual_builder_paradigm import VisualQueryBuilderParadigm
from .rest_api_paradigm import RESTAPIParadigm
from .nosql_paradigm import NoSQLQueryParadigm
from .unix_pipeline_paradigm import UNIXPipelineParadigm
from .xpath_paradigm import XPathParadigm
from .rag_paradigm import RAGParadigm
from .transform_paradigm import DataTransformationParadigm
# Auto-register all paradigms
from ..registry import registry
_paradigms = [
# Implemented paradigms
SQLQueryParadigm(),
FullTextSearchParadigm(),
GraphQLQueryParadigm(),
JSONPathQueryParadigm(),
NaturalLanguageQueryParadigm(),
# Documentation-only paradigms (not yet implemented)
QueryByExampleParadigm(),
BatchManipulationParadigm(),
VisualQueryBuilderParadigm(),
RESTAPIParadigm(),
NoSQLQueryParadigm(),
UNIXPipelineParadigm(),
XPathParadigm(),
RAGParadigm(),
DataTransformationParadigm()
]
for paradigm in _paradigms:
registry.register(paradigm)
__all__ = [
'SQLQueryParadigm',
'FullTextSearchParadigm',
'GraphQLQueryParadigm',
'JSONPathQueryParadigm',
'NaturalLanguageQueryParadigm',
'QueryByExampleParadigm',
'BatchManipulationParadigm',
'VisualQueryBuilderParadigm',
'RESTAPIParadigm',
'NoSQLQueryParadigm',
'UNIXPipelineParadigm',
'XPathParadigm',
'RAGParadigm',
'DataTransformationParadigm'
]

View File

@@ -0,0 +1,120 @@
"""
Batch Manipulation Paradigm - Export/Edit/Import workflows.
"""
import time
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class BatchManipulationParadigm(BaseQueryParadigm):
"""Batch manipulation paradigm for export/edit/import workflows."""
@property
def name(self) -> str:
return "Batch Manipulation"
@property
def description(self) -> str:
return "Export data to external formats (CSV/Excel), edit outside MarkiTect, then re-import with validation"
@property
def category(self) -> str:
return "procedural"
@property
def complexity(self) -> str:
return "intermediate"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute batch operation (not yet implemented)."""
start_time = time.time()
# This is a documentation paradigm - not yet implemented
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm=self.name,
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={
"status": "not_implemented",
"implementation_issue": "TBD - to be created",
"description": "Batch manipulation enables export to CSV/Excel, external editing, and validated re-import"
},
success=False,
error_message="Batch Manipulation paradigm not yet implemented. This paradigm will enable export/edit/import workflows."
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example batch operations."""
return [
{
"name": "Export to CSV",
"description": "Export query results to CSV for external editing",
"query": "export --format=csv --query='SELECT * FROM files WHERE type=\"markdown\"' --output=files.csv"
},
{
"name": "Export to Excel",
"description": "Export with multiple sheets for complex data",
"query": "export --format=xlsx --sheets='files,tags,authors' --output=markitect_data.xlsx"
},
{
"name": "Import from CSV",
"description": "Import edited data with validation",
"query": "import --format=csv --file=edited_files.csv --validate --dry-run"
},
{
"name": "Batch tag update",
"description": "Export tags, edit in Excel, re-import",
"query": "export --format=xlsx --table=file_tags --output=tags.xlsx; import --file=tags_edited.xlsx --table=file_tags"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate batch operation command."""
if not query.strip():
return False, "Batch operation command cannot be empty"
valid_commands = ['export', 'import', 'validate', 'transform']
command = query.strip().split()[0]
if command not in valid_commands:
return False, f"Command must be one of: {', '.join(valid_commands)}"
return True, None
def get_syntax_help(self) -> str:
"""Get syntax help for batch operations."""
return """Batch Manipulation Syntax:
Export Operations:
export --format=<csv|xlsx|json> --query="<SQL>" --output=<filename>
export --format=<csv|xlsx|json> --table=<table_name> --output=<filename>
Import Operations:
import --format=<csv|xlsx|json> --file=<filename> [--table=<table>] [--validate] [--dry-run]
Transform Operations:
transform --file=<input> --script=<transformation> --output=<output>
Export Options:
--format: Output format (csv, xlsx, json)
--query: SQL query to define export data
--table: Specific table to export
--output: Output filename
Import Options:
--format: Input format (csv, xlsx, json)
--file: Input filename
--table: Target table (auto-detected if not specified)
--validate: Validate data before import
--dry-run: Show what would be imported without actually importing
Examples:
export --format=csv --query="SELECT * FROM files WHERE author='Alice'" --output=alice_files.csv
import --format=csv --file=edited_files.csv --validate --dry-run
"""

View File

@@ -0,0 +1,271 @@
"""
Full Text Search Paradigm - FTS5-powered content search.
"""
import time
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class FullTextSearchParadigm(BaseQueryParadigm):
"""Full text search paradigm using FTS5 for content discovery."""
@property
def name(self) -> str:
return "FTS"
@property
def description(self) -> str:
return "Full text search across markdown content using SQLite FTS5 for semantic discovery"
@property
def category(self) -> str:
return "textual"
@property
def complexity(self) -> str:
return "beginner"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute full text search query."""
start_time = time.time()
try:
from ...plugins.builtin.search.fts_search import FTSSearchPlugin
# Get database path from config
db_path = config.get('db_path') if config else 'markitect.db'
# Create FTS search plugin
fts_plugin = FTSSearchPlugin()
# Execute search
content_type = config.get('content_type', 'all') if config else 'all'
limit = config.get('limit', 20) if config else 20
offset = config.get('offset', 0) if config else 0
search_results = fts_plugin.search(
db_path=db_path,
query=query,
content_type=content_type,
limit=limit,
offset=offset
)
execution_time = (time.time() - start_time) * 1000
# Convert FTS results to standard format
results = []
for result in search_results:
if result['type'] == 'file':
results.append({
'type': 'file',
'score': result['score'],
'filename': result['file']['filename'],
'content_preview': result.get('highlight', ''),
'file_id': result['file']['id'],
'created_at': result['file']['created_at']
})
elif result['type'] == 'schema':
results.append({
'type': 'schema',
'score': result['score'],
'filename': result['schema']['filename'],
'title': result['schema']['title'],
'description': result['schema']['description'],
'schema_id': result['schema']['id'],
'highlight': result.get('highlight', '')
})
return QueryResult(
paradigm="FTS",
query=query,
execution_time_ms=execution_time,
result_count=len(results),
results=results,
metadata={
"content_type": content_type,
"fts_enabled": True,
"query_type": self._detect_query_type(query)
},
success=True
)
except Exception as e:
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm="FTS",
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={"fts_enabled": False},
success=False,
error_message=str(e)
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example FTS queries."""
return [
{
"name": "Simple search",
"description": "Find documents containing specific words",
"query": "documentation"
},
{
"name": "Multiple terms",
"description": "Search for documents with multiple terms",
"query": "API documentation"
},
{
"name": "Exact phrase",
"description": "Search for exact phrases",
"query": '"getting started"'
},
{
"name": "Boolean search",
"description": "Use AND/OR operators",
"query": "API AND documentation NOT deprecated"
},
{
"name": "Wildcard search",
"description": "Prefix matching with wildcards",
"query": "config*"
},
{
"name": "Proximity search",
"description": "Find terms near each other",
"query": "NEAR(database query, 5)"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate FTS query syntax."""
if not query or not query.strip():
return False, "Query cannot be empty"
# Use the FTS query parser for validation
try:
from ...plugins.builtin.search.query_parser import QueryParser
parser = QueryParser()
return parser.validate_query(query)
except ImportError:
# Fallback validation
return self._basic_validation(query)
def _basic_validation(self, query: str) -> tuple[bool, Optional[str]]:
"""Basic FTS query validation."""
# Check for balanced quotes
quote_count = query.count('"')
if quote_count % 2 != 0:
return False, "Unmatched quotes in query"
# Check for balanced parentheses
open_parens = query.count('(')
close_parens = query.count(')')
if open_parens != close_parens:
return False, "Unmatched parentheses in query"
return True, None
def get_syntax_help(self) -> str:
"""Get FTS syntax help."""
return """Full Text Search Syntax:
Basic Search:
word - Find documents containing 'word'
multiple words - Find documents with all words (implicit AND)
Phrase Search:
"exact phrase" - Find exact phrase
Boolean Operators:
word1 AND word2 - Both words must be present
word1 OR word2 - Either word can be present
word1 NOT word2 - First word present, second word absent
Wildcards:
prefix* - Prefix matching (config* matches configuration)
Proximity Search:
NEAR(word1 word2, 5) - Words within 5 words of each other
Column-Specific:
filename:readme - Search only in filename field
content:tutorial - Search only in content field
Examples:
documentation
"getting started"
API AND documentation
config* OR setting*
NEAR(database query, 10)
"""
def _detect_query_type(self, query: str) -> str:
"""Detect FTS query type."""
query_upper = query.upper()
if '"' in query:
return "phrase_search"
elif 'NEAR(' in query_upper:
return "proximity_search"
elif any(op in query_upper for op in [' AND ', ' OR ', ' NOT ']):
return "boolean_search"
elif '*' in query:
return "wildcard_search"
elif ':' in query:
return "column_search"
else:
return "simple_search"
def can_translate_from(self, other_paradigm: str) -> bool:
"""Check if we can translate from another paradigm."""
return other_paradigm.lower() in ["natural_language", "sql"]
def translate_query(self, query: str, from_paradigm: str) -> Optional[str]:
"""Translate from another paradigm to FTS."""
if from_paradigm.lower() == "natural_language":
return self._translate_natural_language_to_fts(query)
elif from_paradigm.lower() == "sql":
return self._translate_sql_to_fts(query)
return None
def _translate_natural_language_to_fts(self, query: str) -> Optional[str]:
"""Translate natural language to FTS query."""
query_lower = query.lower()
# Extract key terms and convert to FTS syntax
if "search for" in query_lower:
# Extract what comes after "search for"
parts = query_lower.split("search for", 1)
if len(parts) > 1:
search_term = parts[1].strip()
return search_term.replace(" and ", " AND ").replace(" or ", " OR ")
if "find" in query_lower and "contain" in query_lower:
# Extract terms between "find" and "contain"
import re
match = re.search(r'find.*?contain.*?["\'](.+?)["\']', query_lower)
if match:
return f'"{match.group(1)}"'
# Simple keyword extraction
keywords = [word for word in query.split() if len(word) > 3 and word.lower() not in ['find', 'search', 'for', 'documents', 'files']]
if keywords:
return " AND ".join(keywords)
return None
def _translate_sql_to_fts(self, query: str) -> Optional[str]:
"""Translate simple SQL LIKE queries to FTS."""
if 'LIKE' in query.upper():
import re
# Extract LIKE patterns
like_matches = re.findall(r"LIKE\s+'%(.+?)%'", query, re.IGNORECASE)
if like_matches:
return " AND ".join(like_matches)
return None

View File

@@ -0,0 +1,419 @@
"""
GraphQL Query Paradigm - Flexible graph-based queries.
"""
import time
import json
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class GraphQLQueryParadigm(BaseQueryParadigm):
"""GraphQL query paradigm for flexible, graph-based data access."""
@property
def name(self) -> str:
return "GraphQL"
@property
def description(self) -> str:
return "Graph-based queries with precise field selection and nested data relationships"
@property
def category(self) -> str:
return "structural"
@property
def complexity(self) -> str:
return "intermediate"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute GraphQL query."""
start_time = time.time()
try:
from ...graphql.resolvers import GraphQLResolvers
from ...graphql.schema import schema
# Get database path from config
db_path = config.get('db_path') if config else 'markitect.db'
# Parse variables if provided
variables = {}
if config and 'variables' in config:
if isinstance(config['variables'], str):
variables = json.loads(config['variables'])
elif isinstance(config['variables'], dict):
variables = config['variables']
# Execute GraphQL query
result = schema.execute(query, variable_values=variables, context={'db_path': db_path})
execution_time = (time.time() - start_time) * 1000
if result.errors:
return QueryResult(
paradigm="GraphQL",
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={"variables": variables},
success=False,
error_message=str(result.errors[0])
)
# Convert GraphQL result to standard format
results = []
result_data = result.data or {}
# Handle different result types
for key, value in result_data.items():
if isinstance(value, list):
for item in value:
results.append({
"query_field": key,
**self._flatten_graphql_result(item)
})
elif isinstance(value, dict):
results.append({
"query_field": key,
**self._flatten_graphql_result(value)
})
else:
results.append({
"query_field": key,
"value": value
})
return QueryResult(
paradigm="GraphQL",
query=query,
execution_time_ms=execution_time,
result_count=len(results),
results=results,
metadata={
"variables": variables,
"query_type": self._detect_query_type(query)
},
success=True
)
except Exception as e:
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm="GraphQL",
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={},
success=False,
error_message=str(e)
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example GraphQL queries."""
return [
{
"name": "Basic file query",
"description": "Get basic information about markdown files",
"query": """query {
markdownFiles(limit: 5) {
id
filename
wordCount
hassFrontMatter
}
}"""
},
{
"name": "File with front matter",
"description": "Get files with their front matter data",
"query": """query {
markdownFiles(hasFrontMatter: true) {
filename
frontMatter {
key
value
}
createdAt
}
}"""
},
{
"name": "Schema information",
"description": "Get schema details and statistics",
"query": """query {
schemas {
filename
title
description
schemaVersion
propertyCount
}
}"""
},
{
"name": "Search with variables",
"description": "Search using variables",
"query": """query SearchContent($searchTerm: String!) {
search(query: $searchTerm, limit: 10) {
type
score
file {
filename
wordCount
}
highlight
}
}"""
},
{
"name": "Database statistics",
"description": "Get overall database statistics",
"query": """query {
databaseStats {
totalFiles
totalSchemas
totalSizeBytes
lastUpdated
}
}"""
},
{
"name": "Specific file by ID",
"description": "Get detailed information about a specific file",
"query": """query GetFile($fileId: Int!) {
markdownFile(id: $fileId) {
filename
content
frontMatterRaw
wordCount
lineCount
createdAt
}
}"""
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate GraphQL query syntax."""
if not query or not query.strip():
return False, "Query cannot be empty"
# Basic GraphQL syntax validation
query = query.strip()
# Should start with query, mutation, or subscription
if not any(query.startswith(keyword) for keyword in ['query', 'mutation', 'subscription', '{']):
return False, "GraphQL query must start with 'query', 'mutation', 'subscription', or '{'"
# Check for balanced braces
open_braces = query.count('{')
close_braces = query.count('}')
if open_braces != close_braces:
return False, "Unmatched braces in GraphQL query"
# Check for balanced parentheses
open_parens = query.count('(')
close_parens = query.count(')')
if open_parens != close_parens:
return False, "Unmatched parentheses in GraphQL query"
return True, None
def get_syntax_help(self) -> str:
"""Get GraphQL syntax help."""
return """GraphQL Query Syntax:
Basic Structure:
query {
fieldName {
subfield
}
}
Available Root Fields:
- markdownFile(id: Int, filename: String)
- markdownFiles(limit: Int, offset: Int, hasFrontMatter: Boolean)
- schema(id: Int, filename: String)
- schemas(limit: Int, offset: Int)
- search(query: String!, type: String, limit: Int)
- databaseStats
- astQuery(fileId: Int, filename: String, jsonpath: String!)
Field Selection:
markdownFiles {
id
filename
wordCount
frontMatter {
key
value
}
}
Variables:
query GetFile($id: Int!) {
markdownFile(id: $id) {
filename
content
}
}
Aliases:
query {
recent: markdownFiles(limit: 5) { filename }
old: markdownFiles(offset: 100, limit: 5) { filename }
}
Fragments:
fragment FileInfo on MarkdownFile {
id
filename
wordCount
}
query {
markdownFiles {
...FileInfo
createdAt
}
}
"""
def _detect_query_type(self, query: str) -> str:
"""Detect GraphQL query type."""
query_lower = query.lower().strip()
if query_lower.startswith('mutation'):
return "mutation"
elif query_lower.startswith('subscription'):
return "subscription"
elif 'search(' in query_lower:
return "search_query"
elif 'astquery(' in query_lower:
return "ast_query"
elif any(field in query_lower for field in ['markdownfiles', 'schemas']):
return "list_query"
elif any(field in query_lower for field in ['markdownfile', 'schema']):
return "single_query"
elif 'databasestats' in query_lower:
return "stats_query"
else:
return "query"
def _flatten_graphql_result(self, item: Any) -> Dict[str, Any]:
"""Flatten GraphQL result for standardized output."""
if isinstance(item, dict):
flattened = {}
for key, value in item.items():
if isinstance(value, (dict, list)):
flattened[key] = json.dumps(value) if isinstance(value, dict) else value
else:
flattened[key] = value
return flattened
else:
return {"value": item}
def can_translate_from(self, other_paradigm: str) -> bool:
"""Check if we can translate from another paradigm."""
return other_paradigm.lower() in ["sql", "natural_language"]
def translate_query(self, query: str, from_paradigm: str) -> Optional[str]:
"""Translate from another paradigm to GraphQL."""
if from_paradigm.lower() == "sql":
return self._translate_sql_to_graphql(query)
elif from_paradigm.lower() == "natural_language":
return self._translate_natural_language_to_graphql(query)
return None
def _translate_sql_to_graphql(self, query: str) -> Optional[str]:
"""Translate simple SQL to GraphQL."""
query_upper = query.upper().strip()
# Simple translations for common patterns
if 'SELECT * FROM markdown_files' in query_upper:
return """query {
markdownFiles {
id
filename
content
createdAt
}
}"""
elif 'SELECT filename FROM markdown_files' in query_upper:
return """query {
markdownFiles {
filename
}
}"""
elif 'SELECT * FROM schemas' in query_upper:
return """query {
schemas {
id
filename
title
description
schemaContent
}
}"""
elif 'COUNT(*) FROM markdown_files' in query_upper:
return """query {
databaseStats {
totalFiles
}
}"""
return None
def _translate_natural_language_to_graphql(self, query: str) -> Optional[str]:
"""Translate natural language to GraphQL."""
query_lower = query.lower()
if "all files" in query_lower or "list files" in query_lower:
return """query {
markdownFiles {
id
filename
wordCount
createdAt
}
}"""
elif "search for" in query_lower:
# Extract search term
parts = query_lower.split("search for", 1)
if len(parts) > 1:
search_term = parts[1].strip().strip('"\'')
return f'''query {{
search(query: "{search_term}") {{
type
score
file {{
filename
}}
highlight
}}
}}'''
elif "database statistics" in query_lower or "stats" in query_lower:
return """query {
databaseStats {
totalFiles
totalSchemas
totalSizeBytes
lastUpdated
}
}"""
elif "schemas" in query_lower:
return """query {
schemas {
filename
title
description
}
}"""
return None

View File

@@ -0,0 +1,333 @@
"""
JSONPath Query Paradigm - Path-based navigation through AST structures.
"""
import time
import json
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class JSONPathQueryParadigm(BaseQueryParadigm):
"""JSONPath query paradigm for navigating AST structures."""
@property
def name(self) -> str:
return "JSONPath"
@property
def description(self) -> str:
return "XPath-like navigation through AST trees for precise structural queries"
@property
def category(self) -> str:
return "structural"
@property
def complexity(self) -> str:
return "advanced"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute JSONPath query against AST data."""
start_time = time.time()
try:
from ...ast_service import ASTService
# Get database path and file info from config
db_path = config.get('db_path') if config else 'markitect.db'
file_id = config.get('file_id') if config else None
filename = config.get('filename') if config else None
if not file_id and not filename:
raise ValueError("Either file_id or filename must be provided for JSONPath queries")
# Get AST service
ast_service = ASTService(db_path)
# Get AST for the specified file
if file_id:
ast_data = ast_service.get_ast_by_file_id(file_id)
else:
ast_data = ast_service.get_ast_by_filename(filename)
if not ast_data:
raise ValueError(f"No AST found for {'file_id=' + str(file_id) if file_id else 'filename=' + filename}")
# Execute JSONPath query
try:
import jsonpath_ng
parser = jsonpath_ng.parse(query)
matches = parser.find(ast_data)
results = []
for match in matches:
result_item = {
"path": str(match.full_path),
"value": match.value,
"context": self._get_context(match, ast_data)
}
results.append(result_item)
except ImportError:
# Fallback: simple dot-notation parsing
results = self._simple_path_query(query, ast_data)
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm="JSONPath",
query=query,
execution_time_ms=execution_time,
result_count=len(results),
results=results,
metadata={
"file_id": file_id,
"filename": filename,
"ast_available": True,
"query_type": self._detect_query_type(query)
},
success=True
)
except Exception as e:
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm="JSONPath",
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={"ast_available": False},
success=False,
error_message=str(e)
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example JSONPath queries."""
return [
{
"name": "All headings",
"description": "Find all heading nodes in the AST",
"query": "$..heading"
},
{
"name": "Top-level headings",
"description": "Find only H1 headings",
"query": "$..heading[?(@.level == 1)]"
},
{
"name": "Code blocks",
"description": "Find all code block nodes",
"query": "$..code_block"
},
{
"name": "Links with URLs",
"description": "Find all link nodes with their URLs",
"query": "$..link[?(@.url)]"
},
{
"name": "Image sources",
"description": "Extract all image source URLs",
"query": "$..image.src"
},
{
"name": "List items",
"description": "Find all list item contents",
"query": "$..list_item.children[*].text"
},
{
"name": "Nested structures",
"description": "Find deeply nested elements",
"query": "$..children[*].children[*].type"
},
{
"name": "Content with attributes",
"description": "Find nodes with specific attributes",
"query": "$..node[?(@.attrs.class)]"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate JSONPath query syntax."""
if not query or not query.strip():
return False, "Query cannot be empty"
# Basic JSONPath validation
if not query.startswith('$'):
return False, "JSONPath queries must start with '$'"
# Check for balanced brackets
open_brackets = query.count('[')
close_brackets = query.count(']')
if open_brackets != close_brackets:
return False, "Unmatched brackets in JSONPath query"
# Check for balanced parentheses in filter expressions
open_parens = query.count('(')
close_parens = query.count(')')
if open_parens != close_parens:
return False, "Unmatched parentheses in JSONPath query"
return True, None
def get_syntax_help(self) -> str:
"""Get JSONPath syntax help."""
return """JSONPath Query Syntax:
Basic Navigation:
$ - Root node
.child - Direct child access
..child - Recursive descent (any level)
[*] - All array elements
[0] - First array element
[-1] - Last array element
Array Operations:
[start:end] - Array slice
[0,1,2] - Multiple specific indices
Filter Expressions:
[?(@.field)] - Filter by field existence
[?(@.field == 'value')] - Filter by field value
[?(@.level > 1)] - Numeric comparison
[?(@.type == 'heading')] - String comparison
Common AST Patterns:
$..heading - All headings at any level
$..heading[?(@.level==1)] - Only H1 headings
$..code_block.language - Programming languages used
$..link.url - All link URLs
$..image.src - All image sources
$..list_item.text - List item contents
Advanced Examples:
$.children[*].type - Types of top-level elements
$..children[?(@.type=='text')].content - All text content
$..node[?(@.attrs.class)] - Nodes with CSS classes
Operators:
==, != - Equality
<, <=, >, >= - Comparison
=~ - Regular expression match
in - Membership test
Note: JSONPath queries operate on the parsed AST structure of markdown files.
Use 'markitect ast-show <file>' to see the AST structure first.
"""
def _detect_query_type(self, query: str) -> str:
"""Detect JSONPath query type."""
if '..' in query:
return "recursive_descent"
elif '[?' in query:
return "filtered_query"
elif '[*]' in query:
return "array_wildcard"
elif any(op in query for op in ['[0]', '[1]', '[-1]']):
return "indexed_access"
elif ':' in query and '[' in query:
return "array_slice"
else:
return "direct_access"
def _get_context(self, match, ast_data: Dict) -> Dict[str, Any]:
"""Get context information for a JSONPath match."""
context = {
"parent_path": None,
"sibling_count": 0,
"depth": len(str(match.full_path).split('.'))
}
# Try to get parent context
path_parts = str(match.full_path).split('.')
if len(path_parts) > 1:
context["parent_path"] = '.'.join(path_parts[:-1])
return context
def _simple_path_query(self, query: str, data: Any) -> List[Dict[str, Any]]:
"""Simple fallback JSONPath implementation using dot notation."""
results = []
try:
# Very basic implementation for simple paths
if query == '$':
results.append({
"path": "$",
"value": data,
"context": {"depth": 0}
})
elif query.startswith('$.'):
# Simple dot notation
path_parts = query[2:].split('.')
current = data
current_path = "$"
for part in path_parts:
current_path += f".{part}"
if isinstance(current, dict) and part in current:
current = current[part]
elif isinstance(current, list) and part == '*':
# Handle wildcard for arrays
for i, item in enumerate(current):
results.append({
"path": f"{current_path}[{i}]",
"value": item,
"context": {"depth": len(path_parts)}
})
return results
else:
break
if current is not None:
results.append({
"path": current_path,
"value": current,
"context": {"depth": len(path_parts)}
})
except Exception:
pass
return results
def can_translate_from(self, other_paradigm: str) -> bool:
"""Check if we can translate from another paradigm."""
return other_paradigm.lower() in ["natural_language"]
def translate_query(self, query: str, from_paradigm: str) -> Optional[str]:
"""Translate from another paradigm to JSONPath."""
if from_paradigm.lower() == "natural_language":
return self._translate_natural_language_to_jsonpath(query)
return None
def _translate_natural_language_to_jsonpath(self, query: str) -> Optional[str]:
"""Translate natural language to JSONPath."""
query_lower = query.lower()
# Common patterns
if "all headings" in query_lower or "find headings" in query_lower:
return "$..heading"
elif "first heading" in query_lower or "main heading" in query_lower:
return "$..heading[0]"
elif "code blocks" in query_lower:
return "$..code_block"
elif "links" in query_lower:
return "$..link"
elif "images" in query_lower:
return "$..image"
elif "list items" in query_lower:
return "$..list_item"
elif "all text" in query_lower:
return "$..text"
# Level-specific patterns
if "h1" in query_lower or "level 1" in query_lower:
return "$..heading[?(@.level == 1)]"
elif "h2" in query_lower or "level 2" in query_lower:
return "$..heading[?(@.level == 2)]"
return None

View File

@@ -0,0 +1,400 @@
"""
Natural Language Query Paradigm - Human-friendly query interface.
"""
import time
import re
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class NaturalLanguageQueryParadigm(BaseQueryParadigm):
"""Natural language query paradigm for intuitive, human-friendly queries."""
@property
def name(self) -> str:
return "Natural Language"
@property
def description(self) -> str:
return "Human-friendly queries that translate to appropriate technical paradigms"
@property
def category(self) -> str:
return "semantic"
@property
def complexity(self) -> str:
return "beginner"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute natural language query by translating to appropriate paradigm."""
start_time = time.time()
try:
# Analyze the query and determine the best paradigm
best_paradigm, translated_query = self._analyze_and_translate(query)
if not best_paradigm or not translated_query:
raise ValueError(f"Could not understand query: '{query}'")
# Import the appropriate paradigm
from ..registry import registry
paradigm_instance = registry.get(best_paradigm)
if not paradigm_instance:
raise ValueError(f"Paradigm '{best_paradigm}' not available")
# Execute using the target paradigm
result = paradigm_instance.execute(translated_query, config)
# Update result to show it came from natural language
result.paradigm = "Natural Language"
result.metadata.update({
"original_query": query,
"translated_to": best_paradigm,
"translated_query": translated_query,
"query_intent": self._detect_intent(query)
})
execution_time = (time.time() - start_time) * 1000
result.execution_time_ms = execution_time
return result
except Exception as e:
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm="Natural Language",
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={"query_intent": self._detect_intent(query)},
success=False,
error_message=str(e)
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example natural language queries."""
return [
{
"name": "Find files",
"description": "List and discover files in the system",
"query": "Show me all the files"
},
{
"name": "Search content",
"description": "Search for specific content",
"query": "Find documents about API documentation"
},
{
"name": "Recent activity",
"description": "Find recently modified content",
"query": "What files were created recently?"
},
{
"name": "File statistics",
"description": "Get information about file sizes and counts",
"query": "How many files do I have?"
},
{
"name": "Content analysis",
"description": "Analyze document structure",
"query": "Show me all the headings in the documentation"
},
{
"name": "Schema exploration",
"description": "Discover schemas and their properties",
"query": "What schemas are available?"
},
{
"name": "Large files",
"description": "Find files by size criteria",
"query": "Which files are the largest?"
},
{
"name": "Front matter search",
"description": "Find files with metadata",
"query": "Show files that have front matter"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate natural language query."""
if not query or not query.strip():
return False, "Query cannot be empty"
# Natural language queries are generally always valid
# Just check for reasonable length
if len(query.strip()) < 3:
return False, "Query too short - please be more specific"
if len(query) > 500:
return False, "Query too long - please be more concise"
return True, None
def get_syntax_help(self) -> str:
"""Get natural language syntax help."""
return """Natural Language Query Help:
You can ask questions in plain English! The system will automatically
translate your query to the most appropriate technical format.
Common Patterns:
File Discovery:
"Show me all files"
"List the markdown files"
"What files do I have?"
Content Search:
"Find documents about X"
"Search for API documentation"
"Show files containing 'tutorial'"
File Analysis:
"Which files are the largest?"
"Show recent files"
"Find files with front matter"
Structure Analysis:
"Show me all headings"
"Find all code blocks"
"What links are in the files?"
Statistics:
"How many files do I have?"
"What's the total size?"
"Show database statistics"
Schema Queries:
"What schemas are available?"
"Show schema information"
Tips:
- Be specific about what you want to find
- Use natural questions like "What..." or "Show me..."
- Mention specific content types (files, schemas, headings, etc.)
- Use time references like "recent" or "latest"
The system supports various query types and will choose the best
method to answer your question automatically.
"""
def _analyze_and_translate(self, query: str) -> tuple[Optional[str], Optional[str]]:
"""Analyze natural language query and translate to appropriate paradigm."""
query_lower = query.lower().strip()
# Intent detection with paradigm mapping
intent_patterns = [
# Full text search patterns
(r'find.*about|search.*for|documents.*contain|content.*with', 'fts', self._translate_to_fts),
# File listing patterns
(r'show.*files|list.*files|all.*files|files.*have', 'sql', self._translate_to_sql_files),
# Statistics patterns
(r'how many|count|total|statistics|stats', 'sql', self._translate_to_sql_stats),
# Size/analysis patterns
(r'largest|biggest|smallest|size|length', 'sql', self._translate_to_sql_size),
# Recent/time patterns
(r'recent|latest|new|created.*ago|modified', 'sql', self._translate_to_sql_recent),
# Schema patterns
(r'schema|schemas|json.*schema', 'graphql', self._translate_to_graphql_schemas),
# Structure patterns (headings, links, etc.)
(r'heading|headings|links|code.*block|structure', 'jsonpath', self._translate_to_jsonpath),
# Front matter patterns
(r'front.*matter|metadata|yaml.*header', 'sql', self._translate_to_sql_frontmatter),
# General GraphQL patterns
(r'show.*detailed|complete.*information|comprehensive', 'graphql', self._translate_to_graphql_detailed)
]
# Try to match patterns
for pattern, paradigm, translator in intent_patterns:
if re.search(pattern, query_lower):
translated = translator(query)
if translated:
return paradigm, translated
# Fallback: try FTS for any remaining search-like queries
if any(word in query_lower for word in ['find', 'search', 'show', 'get', 'contains']):
translated = self._translate_to_fts(query)
if translated:
return 'fts', translated
return None, None
def _detect_intent(self, query: str) -> str:
"""Detect the intent of the natural language query."""
query_lower = query.lower()
if any(word in query_lower for word in ['find', 'search', 'about', 'contain']):
return "content_search"
elif any(word in query_lower for word in ['list', 'show', 'all', 'files']):
return "file_listing"
elif any(word in query_lower for word in ['count', 'how many', 'statistics']):
return "statistics"
elif any(word in query_lower for word in ['recent', 'latest', 'new']):
return "temporal_query"
elif any(word in query_lower for word in ['large', 'big', 'small', 'size']):
return "size_analysis"
elif any(word in query_lower for word in ['schema', 'schemas']):
return "schema_query"
elif any(word in query_lower for word in ['heading', 'structure', 'link']):
return "structure_analysis"
else:
return "general_query"
def _translate_to_fts(self, query: str) -> Optional[str]:
"""Translate to full text search query."""
query_lower = query.lower()
# Extract search terms
search_terms = []
# Look for "about X" or "containing X"
about_match = re.search(r'about\s+(.+?)(?:\s+in|\s+from|$)', query_lower)
if about_match:
search_terms.append(about_match.group(1))
contain_match = re.search(r'contain(?:ing)?\s+["\']?(.+?)["\']?(?:\s+|$)', query_lower)
if contain_match:
search_terms.append(contain_match.group(1))
for_match = re.search(r'(?:search\s+)?for\s+(.+?)(?:\s+in|\s+from|$)', query_lower)
if for_match:
search_terms.append(for_match.group(1))
# Clean up search terms
if search_terms:
term = search_terms[0].strip(' "\'')
# Remove common stop words
stop_words = ['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by']
words = [w for w in term.split() if w.lower() not in stop_words and len(w) > 2]
if words:
return ' AND '.join(words)
# Fallback: extract meaningful words from the entire query
meaningful_words = []
for word in query.split():
word_clean = re.sub(r'[^\w]', '', word).lower()
if (len(word_clean) > 3 and
word_clean not in ['find', 'search', 'show', 'documents', 'files', 'about', 'containing']):
meaningful_words.append(word_clean)
if meaningful_words:
return ' AND '.join(meaningful_words[:3]) # Limit to 3 terms
return None
def _translate_to_sql_files(self, query: str) -> Optional[str]:
"""Translate to SQL file listing query."""
query_lower = query.lower()
if any(phrase in query_lower for phrase in ['all files', 'show files', 'list files']):
return "SELECT id, filename, created_at FROM markdown_files ORDER BY created_at DESC LIMIT 20"
return "SELECT filename FROM markdown_files ORDER BY filename"
def _translate_to_sql_stats(self, query: str) -> Optional[str]:
"""Translate to SQL statistics query."""
query_lower = query.lower()
if 'files' in query_lower:
return "SELECT COUNT(*) as file_count FROM markdown_files"
elif 'schema' in query_lower:
return "SELECT COUNT(*) as schema_count FROM schemas"
else:
return "SELECT (SELECT COUNT(*) FROM markdown_files) as files, (SELECT COUNT(*) FROM schemas) as schemas"
def _translate_to_sql_size(self, query: str) -> Optional[str]:
"""Translate to SQL size/length query."""
query_lower = query.lower()
if any(word in query_lower for word in ['largest', 'biggest']):
return "SELECT filename, LENGTH(content) as size FROM markdown_files WHERE content IS NOT NULL ORDER BY size DESC LIMIT 10"
elif any(word in query_lower for word in ['smallest', 'small']):
return "SELECT filename, LENGTH(content) as size FROM markdown_files WHERE content IS NOT NULL ORDER BY size ASC LIMIT 10"
else:
return "SELECT filename, LENGTH(content) as size FROM markdown_files WHERE content IS NOT NULL ORDER BY size DESC LIMIT 10"
def _translate_to_sql_recent(self, query: str) -> Optional[str]:
"""Translate to SQL recent files query."""
return "SELECT filename, created_at FROM markdown_files WHERE created_at > datetime('now', '-7 days') ORDER BY created_at DESC"
def _translate_to_sql_frontmatter(self, query: str) -> Optional[str]:
"""Translate to SQL front matter query."""
return "SELECT filename, front_matter FROM markdown_files WHERE front_matter IS NOT NULL AND front_matter != '{}'"
def _translate_to_graphql_schemas(self, query: str) -> Optional[str]:
"""Translate to GraphQL schema query."""
return """query {
schemas {
filename
title
description
schemaVersion
propertyCount
}
}"""
def _translate_to_graphql_detailed(self, query: str) -> Optional[str]:
"""Translate to detailed GraphQL query."""
query_lower = query.lower()
if 'file' in query_lower:
return """query {
markdownFiles(limit: 10) {
id
filename
wordCount
lineCount
frontMatter {
key
value
}
createdAt
}
}"""
else:
return """query {
databaseStats {
totalFiles
totalSchemas
totalSizeBytes
lastUpdated
}
}"""
def _translate_to_jsonpath(self, query: str) -> Optional[str]:
"""Translate to JSONPath query."""
query_lower = query.lower()
if 'heading' in query_lower:
return "$..heading"
elif 'link' in query_lower:
return "$..link"
elif 'code' in query_lower:
return "$..code_block"
elif 'image' in query_lower:
return "$..image"
else:
return "$..heading" # Default to headings
def can_translate_from(self, other_paradigm: str) -> bool:
"""Natural language doesn't translate from other paradigms."""
return False
def translate_query(self, query: str, from_paradigm: str) -> Optional[str]:
"""Natural language doesn't translate from other paradigms."""
return None

View File

@@ -0,0 +1,88 @@
"""
NoSQL Query Languages Paradigm - MongoDB, Cypher, etc.
"""
import time
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class NoSQLQueryParadigm(BaseQueryParadigm):
"""NoSQL query paradigm for document and graph databases."""
@property
def name(self) -> str:
return "NoSQL Queries"
@property
def description(self) -> str:
return "MongoDB-style queries, Cypher for graph traversal, and other NoSQL query languages"
@property
def category(self) -> str:
return "structural"
@property
def complexity(self) -> str:
return "advanced"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute NoSQL query (not yet implemented)."""
start_time = time.time()
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm=self.name,
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={
"status": "not_implemented",
"implementation_issue": "TBD - to be created",
"description": "NoSQL queries enable document and graph-based data access patterns"
},
success=False,
error_message="NoSQL Query paradigm not yet implemented."
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example NoSQL queries."""
return [
{
"name": "MongoDB-style find",
"description": "Find documents with specific criteria",
"query": "db.files.find({author: 'Alice', tags: {$in: ['tutorial']}})"
},
{
"name": "Cypher graph traversal",
"description": "Find related files through tags",
"query": "MATCH (f:File)-[:HAS_TAG]->(t:Tag)<-[:HAS_TAG]-(related:File) WHERE f.author = 'Alice' RETURN related"
},
{
"name": "Aggregation pipeline",
"description": "MongoDB aggregation for statistics",
"query": "db.files.aggregate([{$group: {_id: '$author', count: {$sum: 1}}}, {$sort: {count: -1}}])"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate NoSQL query syntax."""
if not query.strip():
return False, "NoSQL query cannot be empty"
return True, None
def get_syntax_help(self) -> str:
"""Get syntax help for NoSQL queries."""
return """NoSQL Query Syntax:
MongoDB-style:
db.collection.find({field: value})
db.collection.aggregate([{$match: {field: value}}])
Cypher (Neo4j):
MATCH (n:Label) WHERE n.property = 'value' RETURN n
Supported operations will include find, aggregate, graph traversal, and document manipulation.
"""

View File

@@ -0,0 +1,116 @@
"""
Query By Example (QBE) Paradigm - Visual template-based querying.
"""
import time
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class QueryByExampleParadigm(BaseQueryParadigm):
"""Query By Example paradigm for visual template-based data filtering."""
@property
def name(self) -> str:
return "Query By Example"
@property
def description(self) -> str:
return "Visual template-based queries where users fill in example values to define search criteria"
@property
def category(self) -> str:
return "visual"
@property
def complexity(self) -> str:
return "beginner"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute QBE query (not yet implemented)."""
start_time = time.time()
# This is a documentation paradigm - not yet implemented
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm=self.name,
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={
"status": "not_implemented",
"implementation_issue": "TBD - to be created",
"description": "QBE provides a visual interface where users create templates with example values to define search criteria"
},
success=False,
error_message="Query By Example paradigm not yet implemented. This paradigm will provide visual templates for filtering data."
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example QBE templates."""
return [
{
"name": "Filter by author",
"description": "Template to find files by specific author",
"query": "{'author': 'John Smith', 'type': '*.md'}"
},
{
"name": "Date range filter",
"description": "Template to find files within date range",
"query": "{'created_after': '2024-01-01', 'created_before': '2024-12-31'}"
},
{
"name": "Tag-based filter",
"description": "Template to find files with specific tags",
"query": "{'tags': ['documentation', 'api'], 'status': 'published'}"
},
{
"name": "Content pattern",
"description": "Template to find files matching content patterns",
"query": "{'content_contains': 'function', 'file_extension': '.py'}"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate QBE template structure."""
try:
import json
template = json.loads(query)
if not isinstance(template, dict):
return False, "QBE template must be a JSON object"
return True, None
except json.JSONDecodeError:
return False, "QBE template must be valid JSON"
def get_syntax_help(self) -> str:
"""Get syntax help for QBE."""
return """Query By Example (QBE) Syntax:
QBE uses JSON templates where you specify example values for the fields you want to filter by:
Structure:
{
"field_name": "example_value",
"another_field": "another_value"
}
Supported Fields:
- author: Author name
- type: File type/extension
- tags: Array of tags
- created_after/created_before: Date filters
- content_contains: Text that should appear in content
- file_extension: Specific file extensions
Example:
{
"author": "Alice Johnson",
"tags": ["tutorial", "beginner"],
"created_after": "2024-01-01"
}
This template finds files by Alice Johnson with tutorial and beginner tags created after Jan 1, 2024.
"""

View File

@@ -0,0 +1,110 @@
"""
Retrieval-Augmented Generation (RAG) Paradigm - LLM + Vector Database.
"""
import time
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class RAGParadigm(BaseQueryParadigm):
"""RAG paradigm for LLM-powered semantic search and generation."""
@property
def name(self) -> str:
return "RAG (Retrieval-Augmented Generation)"
@property
def description(self) -> str:
return "Large Language Model retrieves relevant facts from vector database for enhanced responses"
@property
def category(self) -> str:
return "semantic"
@property
def complexity(self) -> str:
return "advanced"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute RAG query (not yet implemented)."""
start_time = time.time()
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm=self.name,
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={
"status": "not_implemented",
"implementation_issue": "TBD - to be created",
"description": "RAG combines semantic search with LLM generation for intelligent responses"
},
success=False,
error_message="RAG paradigm not yet implemented."
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example RAG queries."""
return [
{
"name": "Semantic question",
"description": "Ask questions about content semantically",
"query": "What are the main configuration options for the API?"
},
{
"name": "Summarization request",
"description": "Generate summaries of related content",
"query": "Summarize all documentation about authentication methods"
},
{
"name": "Code explanation",
"description": "Explain code patterns found in files",
"query": "Explain the error handling patterns used in the codebase"
},
{
"name": "Comparative analysis",
"description": "Compare different approaches in documentation",
"query": "Compare the database migration strategies mentioned in the docs"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate RAG query."""
if not query.strip():
return False, "RAG query cannot be empty"
if len(query.strip()) < 10:
return False, "RAG query should be a descriptive question or request"
return True, None
def get_syntax_help(self) -> str:
"""Get syntax help for RAG queries."""
return """RAG (Retrieval-Augmented Generation) Syntax:
RAG queries are natural language questions or requests that combine:
1. Semantic retrieval from vector database
2. LLM generation for comprehensive answers
Query Types:
- Questions: "What is...?", "How does...?", "Why...?"
- Summaries: "Summarize...", "Overview of..."
- Comparisons: "Compare...", "Differences between..."
- Analysis: "Analyze...", "Explain the pattern..."
Examples:
"What are the main API endpoints and their purposes?"
"Summarize the security best practices mentioned in the documentation"
"How do I configure the database connection?"
"Compare SQL vs NoSQL approaches discussed in the docs"
The system will:
1. Convert your query to vector embeddings
2. Retrieve relevant document chunks
3. Generate a comprehensive response using LLM
4. Provide source citations
"""

View File

@@ -0,0 +1,149 @@
"""
REST API Paradigm - HTTP-based data access.
"""
import time
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class RESTAPIParadigm(BaseQueryParadigm):
"""REST API paradigm for HTTP-based data access."""
@property
def name(self) -> str:
return "REST API"
@property
def description(self) -> str:
return "HTTP-based data access using RESTful endpoints with standard HTTP methods and caching"
@property
def category(self) -> str:
return "network"
@property
def complexity(self) -> str:
return "intermediate"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute REST API request (not yet implemented)."""
start_time = time.time()
# This is a documentation paradigm - not yet implemented
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm=self.name,
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={
"status": "not_implemented",
"implementation_issue": "TBD - to be created",
"description": "REST API provides HTTP-based access to MarkiTect data with proper caching and pagination"
},
success=False,
error_message="REST API paradigm not yet implemented. This paradigm will provide HTTP endpoints for data access."
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example REST API requests."""
return [
{
"name": "List files",
"description": "GET request to list all markdown files",
"query": "GET /api/v1/files?type=markdown&limit=20"
},
{
"name": "Search files",
"description": "GET request with search parameters",
"query": "GET /api/v1/files/search?q=documentation&author=Alice&tags=tutorial"
},
{
"name": "Get file content",
"description": "GET specific file with content",
"query": "GET /api/v1/files/123?include=content,metadata,tags"
},
{
"name": "Update file tags",
"description": "PATCH request to update file tags",
"query": "PATCH /api/v1/files/123/tags {'tags': ['updated', 'documentation']}"
},
{
"name": "Bulk operations",
"description": "POST request for bulk file operations",
"query": "POST /api/v1/files/bulk {'action': 'add_tag', 'files': [1,2,3], 'tag': 'archived'}"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate REST API request format."""
if not query.strip():
return False, "REST API request cannot be empty"
parts = query.strip().split(' ', 1)
if len(parts) < 2:
return False, "REST API request must include HTTP method and URL"
method = parts[0].upper()
valid_methods = ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']
if method not in valid_methods:
return False, f"HTTP method must be one of: {', '.join(valid_methods)}"
url = parts[1].split(' ')[0]
if not url.startswith('/api/'):
return False, "URL must start with /api/"
return True, None
def get_syntax_help(self) -> str:
"""Get syntax help for REST API requests."""
return """REST API Request Syntax:
Format: <METHOD> <URL> [JSON_BODY]
HTTP Methods:
- GET: Retrieve data
- POST: Create new resources
- PUT: Update entire resource
- PATCH: Partial update
- DELETE: Remove resource
Base URL: /api/v1
Endpoints:
- /api/v1/files - File operations
- /api/v1/files/search - Search files
- /api/v1/files/{id} - Specific file operations
- /api/v1/tags - Tag operations
- /api/v1/authors - Author operations
- /api/v1/stats - Statistics
Query Parameters:
- limit: Limit number of results (default: 20, max: 100)
- offset: Skip number of results (for pagination)
- include: Comma-separated fields to include
- sort: Sort field (prefix with - for descending)
- filter[field]: Filter by field value
Examples:
GET /api/v1/files?limit=10&sort=-created_at
GET /api/v1/files/search?q=tutorial&filter[author]=Alice
POST /api/v1/files {"path": "/new/file.md", "content": "# New File"}
PATCH /api/v1/files/123 {"tags": ["updated"]}
Response Format:
{
"data": [...],
"meta": {
"total": 100,
"limit": 20,
"offset": 0,
"has_more": true
}
}
"""

View File

@@ -0,0 +1,197 @@
"""
SQL Query Paradigm - Direct database queries using SQL.
"""
import sqlite3
import time
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class SQLQueryParadigm(BaseQueryParadigm):
"""SQL database query paradigm for direct data access."""
@property
def name(self) -> str:
return "SQL"
@property
def description(self) -> str:
return "Direct SQL queries against the MarkiTect database for precise data extraction"
@property
def category(self) -> str:
return "structural"
@property
def complexity(self) -> str:
return "intermediate"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute SQL query against the database."""
start_time = time.time()
try:
# Get database path from config
db_path = config.get('db_path') if config else 'markitect.db'
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query)
rows = cursor.fetchall()
results = [dict(row) for row in rows]
conn.close()
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm="SQL",
query=query,
execution_time_ms=execution_time,
result_count=len(results),
results=results,
metadata={
"database_path": db_path,
"query_type": self._detect_query_type(query)
},
success=True
)
except Exception as e:
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm="SQL",
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={},
success=False,
error_message=str(e)
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example SQL queries."""
return [
{
"name": "List all files",
"description": "Get all markdown files with basic info",
"query": "SELECT id, filename, created_at FROM markdown_files ORDER BY created_at DESC LIMIT 10"
},
{
"name": "Files with front matter",
"description": "Find files that have front matter metadata",
"query": "SELECT filename, front_matter FROM markdown_files WHERE front_matter IS NOT NULL AND front_matter != '{}'"
},
{
"name": "Large files",
"description": "Find files with more than 1000 characters",
"query": "SELECT filename, LENGTH(content) as size FROM markdown_files WHERE LENGTH(content) > 1000 ORDER BY size DESC"
},
{
"name": "Schema statistics",
"description": "Get schema counts and information",
"query": "SELECT COUNT(*) as total_schemas, AVG(LENGTH(schema_content)) as avg_size FROM schemas"
},
{
"name": "Recent activity",
"description": "Show recent file activity",
"query": "SELECT filename, created_at FROM markdown_files WHERE created_at > datetime('now', '-7 days') ORDER BY created_at DESC"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate SQL query syntax."""
if not query or not query.strip():
return False, "Query cannot be empty"
query_upper = query.upper().strip()
# Only allow SELECT queries for safety
if not query_upper.startswith('SELECT'):
return False, "Only SELECT queries are allowed for safety"
# Check for dangerous keywords
dangerous_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE']
for keyword in dangerous_keywords:
if keyword in query_upper:
return False, f"Keyword '{keyword}' is not allowed for safety"
return True, None
def get_syntax_help(self) -> str:
"""Get SQL syntax help."""
return """SQL Query Syntax:
Basic Structure:
SELECT columns FROM table WHERE condition ORDER BY column
Available Tables:
- markdown_files (id, filename, content, front_matter, created_at)
- schemas (id, filename, title, description, schema_content, created_at, updated_at)
Common Functions:
- LENGTH(column) - Get text length
- datetime('now') - Current timestamp
- datetime('now', '-7 days') - Date arithmetic
Examples:
SELECT * FROM markdown_files LIMIT 5
SELECT filename FROM markdown_files WHERE content LIKE '%TODO%'
SELECT COUNT(*) FROM schemas WHERE title IS NOT NULL
Safety Notes:
- Only SELECT queries are allowed
- No data modification operations (INSERT, UPDATE, DELETE)
- No schema changes (CREATE, ALTER, DROP)
"""
def _detect_query_type(self, query: str) -> str:
"""Detect the type of SQL query."""
query_upper = query.upper().strip()
if query_upper.startswith('SELECT'):
if 'COUNT(' in query_upper or 'SUM(' in query_upper or 'AVG(' in query_upper:
return "aggregation"
elif 'JOIN' in query_upper:
return "join"
elif 'WHERE' in query_upper:
return "filtered_select"
else:
return "simple_select"
return "unknown"
def can_translate_from(self, other_paradigm: str) -> bool:
"""Check if we can translate from another paradigm."""
# Could potentially translate simple natural language to SQL
return other_paradigm.lower() in ["natural_language"]
def translate_query(self, query: str, from_paradigm: str) -> Optional[str]:
"""Translate from another paradigm to SQL."""
if from_paradigm.lower() == "natural_language":
return self._translate_natural_language_to_sql(query)
return None
def _translate_natural_language_to_sql(self, query: str) -> Optional[str]:
"""Simple natural language to SQL translation."""
query_lower = query.lower()
# Simple pattern matching for common requests
if "all files" in query_lower or "list files" in query_lower:
return "SELECT id, filename, created_at FROM markdown_files ORDER BY created_at DESC"
elif "recent files" in query_lower:
return "SELECT filename, created_at FROM markdown_files WHERE created_at > datetime('now', '-7 days') ORDER BY created_at DESC"
elif "large files" in query_lower or "big files" in query_lower:
return "SELECT filename, LENGTH(content) as size FROM markdown_files WHERE LENGTH(content) > 1000 ORDER BY size DESC"
elif "schemas" in query_lower and "count" in query_lower:
return "SELECT COUNT(*) as total_schemas FROM schemas"
elif "front matter" in query_lower:
return "SELECT filename, front_matter FROM markdown_files WHERE front_matter IS NOT NULL AND front_matter != '{}'"
return None

View File

@@ -0,0 +1,116 @@
"""
Data Transformation Paradigm - JSON/YAML/XML serialization.
"""
import time
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class DataTransformationParadigm(BaseQueryParadigm):
"""Data transformation paradigm for format conversion and serialization."""
@property
def name(self) -> str:
return "Data Transformation"
@property
def description(self) -> str:
return "Serialize and transform data between JSON, YAML, XML, and other formats for application use"
@property
def category(self) -> str:
return "procedural"
@property
def complexity(self) -> str:
return "intermediate"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute transformation (not yet implemented)."""
start_time = time.time()
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm=self.name,
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={
"status": "not_implemented",
"implementation_issue": "TBD - to be created",
"description": "Data transformation enables format conversion and serialization"
},
success=False,
error_message="Data Transformation paradigm not yet implemented."
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example transformations."""
return [
{
"name": "Export to JSON",
"description": "Transform file metadata to JSON",
"query": "transform --input=files --output=json --format=compact"
},
{
"name": "Convert to YAML",
"description": "Export configuration as YAML",
"query": "transform --input=config --output=yaml --pretty"
},
{
"name": "Generate XML",
"description": "Create XML from file structure",
"query": "transform --input=files --output=xml --schema=file-manifest"
},
{
"name": "Custom template",
"description": "Apply custom transformation template",
"query": "transform --template=custom.jinja2 --input=files --output=html"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate transformation command."""
if not query.strip():
return False, "Transformation command cannot be empty"
if not query.startswith('transform'):
return False, "Command must start with 'transform'"
return True, None
def get_syntax_help(self) -> str:
"""Get syntax help for transformations."""
return """Data Transformation Syntax:
Basic Format:
transform --input=<source> --output=<format> [options]
Input Sources:
--input=files - File metadata
--input=tags - Tag information
--input=config - Configuration data
--input=stats - Statistics data
Output Formats:
--output=json - JSON format
--output=yaml - YAML format
--output=xml - XML format
--output=csv - CSV format
--output=html - HTML format
Options:
--pretty - Pretty-print output
--compact - Compact output
--template=<file> - Custom template
--schema=<name> - Use predefined schema
--filter=<expression> - Filter data
Examples:
transform --input=files --output=json --pretty
transform --input=tags --output=yaml --filter="count > 5"
transform --template=report.html --input=stats --output=html
"""

View File

@@ -0,0 +1,107 @@
"""
UNIX Pipeline Paradigm - Stream processing with awk, sed, perl.
"""
import time
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class UNIXPipelineParadigm(BaseQueryParadigm):
"""UNIX pipeline paradigm for stream processing."""
@property
def name(self) -> str:
return "UNIX Pipeline"
@property
def description(self) -> str:
return "Stream processing with UNIX tools like awk, sed, grep, sort for line-by-line data manipulation"
@property
def category(self) -> str:
return "procedural"
@property
def complexity(self) -> str:
return "advanced"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute UNIX pipeline (not yet implemented)."""
start_time = time.time()
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm=self.name,
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={
"status": "not_implemented",
"implementation_issue": "TBD - to be created",
"description": "UNIX pipelines enable powerful stream processing of MarkiTect data"
},
success=False,
error_message="UNIX Pipeline paradigm not yet implemented."
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example UNIX pipeline commands."""
return [
{
"name": "Filter and count",
"description": "Find files by author and count",
"query": "markitect export --format=csv | grep 'Alice' | wc -l"
},
{
"name": "Extract and sort",
"description": "Extract unique authors and sort",
"query": "markitect export --format=csv | cut -d',' -f3 | sort | uniq -c | sort -rn"
},
{
"name": "Complex awk processing",
"description": "Process file metadata with awk",
"query": "markitect export --format=csv | awk -F',' '{if($4>1000) print $1,$2}' | sort"
},
{
"name": "Sed text transformation",
"description": "Transform file paths using sed",
"query": "markitect list-files | sed 's|/old/path|/new/path|g' | sort"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate UNIX pipeline command."""
if not query.strip():
return False, "UNIX pipeline cannot be empty"
if '|' not in query and not any(cmd in query for cmd in ['grep', 'awk', 'sed', 'sort', 'cut', 'wc']):
return False, "Query should contain UNIX pipeline commands"
return True, None
def get_syntax_help(self) -> str:
"""Get syntax help for UNIX pipelines."""
return """UNIX Pipeline Syntax:
Basic Structure:
markitect <export_command> | <unix_tools> | <more_tools>
Common Tools:
- grep: Filter lines matching pattern
- awk: Process fields and records
- sed: Stream editor for text transformation
- sort: Sort lines
- uniq: Remove duplicate lines
- cut: Extract fields
- wc: Count lines/words/characters
Examples:
markitect export --format=csv | grep 'documentation' | cut -d',' -f1,2
markitect list-files | awk '{print $1}' | sort | uniq
markitect export --format=csv | sed 's/old/new/g' | grep -v '^#'
The pipeline starts with MarkiTect data export and processes it through UNIX tools.
"""

View File

@@ -0,0 +1,135 @@
"""
Visual Query Builder Paradigm - Drag-and-drop query construction.
"""
import time
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class VisualQueryBuilderParadigm(BaseQueryParadigm):
"""Visual query builder paradigm for drag-and-drop query construction."""
@property
def name(self) -> str:
return "Visual Query Builder"
@property
def description(self) -> str:
return "Drag-and-drop interface for building complex queries visually, generates SQL/GraphQL automatically"
@property
def category(self) -> str:
return "visual"
@property
def complexity(self) -> str:
return "beginner"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute visual query (not yet implemented)."""
start_time = time.time()
# This is a documentation paradigm - not yet implemented
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm=self.name,
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={
"status": "not_implemented",
"implementation_issue": "TBD - to be created",
"description": "Visual query builder provides drag-and-drop interface for constructing complex queries"
},
success=False,
error_message="Visual Query Builder paradigm not yet implemented. This paradigm will provide a web-based drag-and-drop interface."
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example visual query configurations."""
return [
{
"name": "Simple filter",
"description": "Drag file table, add author filter",
"query": "{'tables': ['files'], 'filters': [{'field': 'author', 'operator': 'equals', 'value': 'Alice'}]}"
},
{
"name": "Join with aggregation",
"description": "Join files and tags, count by tag",
"query": "{'tables': ['files', 'tags'], 'joins': [{'type': 'inner', 'on': 'file_id'}], 'groupBy': ['tag_name'], 'aggregates': [{'function': 'count', 'field': '*'}]}"
},
{
"name": "Date range with sorting",
"description": "Files created in last month, sorted by date",
"query": "{'tables': ['files'], 'filters': [{'field': 'created_at', 'operator': 'greater_than', 'value': '30 days ago'}], 'orderBy': [{'field': 'created_at', 'direction': 'desc'}]}"
},
{
"name": "Complex multi-table",
"description": "Files with tags and author info, filtered by multiple criteria",
"query": "{'tables': ['files', 'tags', 'authors'], 'joins': [{'type': 'left', 'on': 'file_id'}, {'type': 'inner', 'on': 'author_id'}], 'filters': [{'field': 'tag_name', 'operator': 'in', 'value': ['documentation', 'tutorial']}, {'field': 'author.department', 'operator': 'equals', 'value': 'Engineering'}]}"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate visual query configuration."""
try:
import json
config = json.loads(query)
if not isinstance(config, dict):
return False, "Visual query configuration must be a JSON object"
if 'tables' not in config:
return False, "Visual query must specify at least one table"
if not isinstance(config['tables'], list) or len(config['tables']) == 0:
return False, "Tables must be a non-empty list"
return True, None
except json.JSONDecodeError:
return False, "Visual query configuration must be valid JSON"
def get_syntax_help(self) -> str:
"""Get syntax help for visual query builder."""
return """Visual Query Builder Configuration:
The visual query builder uses JSON configuration that represents the visual elements:
Basic Structure:
{
"tables": ["table1", "table2"],
"joins": [{"type": "inner|left|right", "on": "field_name"}],
"filters": [{"field": "field_name", "operator": "equals|contains|greater_than|in", "value": "value"}],
"groupBy": ["field1", "field2"],
"aggregates": [{"function": "count|sum|avg|min|max", "field": "field_name"}],
"orderBy": [{"field": "field_name", "direction": "asc|desc"}],
"limit": 100
}
Filter Operators:
- equals: Exact match
- contains: Text contains substring
- greater_than, less_than: Numeric/date comparison
- in: Value in list
- between: Value between two values
Aggregate Functions:
- count: Count records
- sum: Sum numeric values
- avg: Average of numeric values
- min/max: Minimum/maximum values
Example:
{
"tables": ["files"],
"filters": [
{"field": "author", "operator": "equals", "value": "Alice"},
{"field": "created_at", "operator": "greater_than", "value": "2024-01-01"}
],
"orderBy": [{"field": "created_at", "direction": "desc"}]
}
"""

View File

@@ -0,0 +1,115 @@
"""
XPath/XQuery Paradigm - Path-based data extraction.
"""
import time
from typing import Dict, Any, List, Optional
from ..base import BaseQueryParadigm, QueryResult
class XPathParadigm(BaseQueryParadigm):
"""XPath/XQuery paradigm for path-based data extraction."""
@property
def name(self) -> str:
return "XPath/XQuery"
@property
def description(self) -> str:
return "Path-based data extraction from structured documents using XPath and XQuery syntax"
@property
def category(self) -> str:
return "structural"
@property
def complexity(self) -> str:
return "advanced"
def execute(self, query: str, config: Dict[str, Any] = None) -> QueryResult:
"""Execute XPath query (not yet implemented)."""
start_time = time.time()
execution_time = (time.time() - start_time) * 1000
return QueryResult(
paradigm=self.name,
query=query,
execution_time_ms=execution_time,
result_count=0,
results=[],
metadata={
"status": "not_implemented",
"implementation_issue": "TBD - to be created",
"description": "XPath enables precise navigation through document structures"
},
success=False,
error_message="XPath/XQuery paradigm not yet implemented."
)
def get_examples(self) -> List[Dict[str, str]]:
"""Get example XPath queries."""
return [
{
"name": "Select files by attribute",
"description": "Find all files with specific author",
"query": "//file[@author='Alice']"
},
{
"name": "Deep path selection",
"description": "Select nested content elements",
"query": "//file/content/section[contains(@title, 'Introduction')]"
},
{
"name": "Conditional selection",
"description": "Select files with multiple conditions",
"query": "//file[@type='markdown' and @size > 1000]/tags/tag"
},
{
"name": "Position-based selection",
"description": "Select first 3 files by creation date",
"query": "//file[position() <= 3 and @created > '2024-01-01']"
}
]
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate XPath query syntax."""
if not query.strip():
return False, "XPath query cannot be empty"
if not (query.startswith('//') or query.startswith('/')):
return False, "XPath query must start with / or //"
return True, None
def get_syntax_help(self) -> str:
"""Get syntax help for XPath queries."""
return """XPath/XQuery Syntax:
Basic Path Selection:
//element - Select all elements anywhere
/root/element - Select from root
element[@attribute='value'] - Select by attribute
Predicates:
//file[@author='Alice'] - Attribute equals
//file[position() <= 3] - Position-based
//file[contains(@tags, 'tutorial')] - Text contains
Functions:
position() - Element position
contains(string, substring) - Text contains
count(elements) - Count elements
text() - Get text content
Document Structure (conceptual):
//file - All files
//file/@author - All author attributes
//file/tags/tag - All tags in files
//file[content/section] - Files with sections
Examples:
//file[@type='markdown']
//file/tags/tag[text()='documentation']
//file[@created > '2024-01-01' and @author='Alice']
"""

View File

@@ -0,0 +1,68 @@
"""
Registry for managing query paradigms.
"""
from typing import Dict, List, Optional
from .base import BaseQueryParadigm
class QueryParadigmRegistry:
"""Registry for managing and discovering query paradigms."""
def __init__(self):
self._paradigms: Dict[str, BaseQueryParadigm] = {}
def register(self, paradigm: BaseQueryParadigm) -> None:
"""Register a new query paradigm."""
self._paradigms[paradigm.name.lower()] = paradigm
def get(self, name: str) -> Optional[BaseQueryParadigm]:
"""Get a paradigm by name."""
return self._paradigms.get(name.lower())
def list_all(self) -> List[BaseQueryParadigm]:
"""Get all registered paradigms."""
return list(self._paradigms.values())
def list_by_category(self, category: str) -> List[BaseQueryParadigm]:
"""Get paradigms by category."""
return [p for p in self._paradigms.values() if p.category == category]
def list_by_complexity(self, complexity: str) -> List[BaseQueryParadigm]:
"""Get paradigms by complexity level."""
return [p for p in self._paradigms.values() if p.complexity == complexity]
def get_categories(self) -> List[str]:
"""Get all available categories."""
return list(set(p.category for p in self._paradigms.values()))
def get_complexity_levels(self) -> List[str]:
"""Get all available complexity levels."""
return list(set(p.complexity for p in self._paradigms.values()))
def search_paradigms(self, query: str) -> List[BaseQueryParadigm]:
"""Search paradigms by name or description."""
query_lower = query.lower()
results = []
for paradigm in self._paradigms.values():
if (query_lower in paradigm.name.lower() or
query_lower in paradigm.description.lower()):
results.append(paradigm)
return results
def get_translation_matrix(self) -> Dict[str, List[str]]:
"""Get matrix of which paradigms can translate to which others."""
matrix = {}
for paradigm in self._paradigms.values():
targets = []
for other in self._paradigms.values():
if other.can_translate_from(paradigm.name):
targets.append(other.name)
matrix[paradigm.name] = targets
return matrix
# Global registry instance
registry = QueryParadigmRegistry()

View File

@@ -0,0 +1,333 @@
"""
Tests for query paradigm system - Issue #62
"""
import pytest
import json
from markitect.query_paradigms.registry import registry
from markitect.query_paradigms.base import BaseQueryParadigm, QueryResult
from markitect.query_paradigms.paradigms.sql_paradigm import SQLQueryParadigm
from markitect.query_paradigms.paradigms.fts_paradigm import FullTextSearchParadigm
from markitect.query_paradigms.paradigms.qbe_paradigm import QueryByExampleParadigm
class TestQueryParadigmRegistry:
"""Test the query paradigm registry system."""
def test_registry_has_paradigms(self):
"""Test that paradigms are automatically registered."""
paradigms = registry.list_all()
assert len(paradigms) >= 14 # We expect at least 14 paradigms
# Check that key paradigms are present
paradigm_names = [p.name for p in paradigms]
assert "SQL" in paradigm_names
assert "FTS" in paradigm_names
assert "GraphQL" in paradigm_names
assert "Natural Language" in paradigm_names
def test_get_paradigm_by_name(self):
"""Test retrieving paradigms by name."""
sql_paradigm = registry.get("SQL")
assert sql_paradigm is not None
assert sql_paradigm.name == "SQL"
assert sql_paradigm.category == "structural"
# Test case insensitive lookup
fts_paradigm = registry.get("fts")
assert fts_paradigm is not None
assert fts_paradigm.name == "FTS"
def test_get_nonexistent_paradigm(self):
"""Test getting a paradigm that doesn't exist."""
result = registry.get("NonExistentParadigm")
assert result is None
def test_list_by_category(self):
"""Test filtering paradigms by category."""
structural = registry.list_by_category("structural")
assert len(structural) > 0
for paradigm in structural:
assert paradigm.category == "structural"
textual = registry.list_by_category("textual")
assert len(textual) > 0
for paradigm in textual:
assert paradigm.category == "textual"
def test_list_by_complexity(self):
"""Test filtering paradigms by complexity."""
beginner = registry.list_by_complexity("beginner")
assert len(beginner) > 0
for paradigm in beginner:
assert paradigm.complexity == "beginner"
def test_search_paradigms(self):
"""Test searching paradigms by query."""
# Search by name
sql_results = registry.search_paradigms("SQL")
assert len(sql_results) > 0
assert any(p.name == "SQL" for p in sql_results)
# Search by description
visual_results = registry.search_paradigms("visual")
assert len(visual_results) > 0
assert any("visual" in p.description.lower() for p in visual_results)
# Search for non-existent term
empty_results = registry.search_paradigms("xyznonexistent")
assert len(empty_results) == 0
def test_get_categories(self):
"""Test getting all available categories."""
categories = registry.get_categories()
assert isinstance(categories, list)
assert len(categories) > 0
assert "structural" in categories
assert "textual" in categories
assert "semantic" in categories
def test_get_complexity_levels(self):
"""Test getting all complexity levels."""
levels = registry.get_complexity_levels()
assert isinstance(levels, list)
assert len(levels) > 0
assert "beginner" in levels
assert "intermediate" in levels
assert "advanced" in levels
class TestSQLParadigm:
"""Test the SQL query paradigm."""
def test_paradigm_properties(self):
"""Test SQL paradigm basic properties."""
paradigm = SQLQueryParadigm()
assert paradigm.name == "SQL"
assert paradigm.category == "structural"
assert paradigm.complexity == "intermediate"
assert "database" in paradigm.description.lower()
def test_validate_query(self):
"""Test SQL query validation."""
paradigm = SQLQueryParadigm()
# Valid queries
valid, error = paradigm.validate_query("SELECT * FROM files")
assert valid
assert error is None
valid, error = paradigm.validate_query("SELECT name FROM files WHERE author = 'Alice'")
assert valid
# Invalid queries
valid, error = paradigm.validate_query("")
assert not valid
assert error is not None
valid, error = paradigm.validate_query(" ")
assert not valid
def test_get_examples(self):
"""Test SQL paradigm examples."""
paradigm = SQLQueryParadigm()
examples = paradigm.get_examples()
assert isinstance(examples, list)
assert len(examples) > 0
for example in examples:
assert "name" in example
assert "description" in example
assert "query" in example
assert isinstance(example["query"], str)
def test_get_syntax_help(self):
"""Test SQL syntax help."""
paradigm = SQLQueryParadigm()
help_text = paradigm.get_syntax_help()
assert isinstance(help_text, str)
assert len(help_text) > 0
assert "SELECT" in help_text
class TestFTSParadigm:
"""Test the Full Text Search paradigm."""
def test_paradigm_properties(self):
"""Test FTS paradigm basic properties."""
paradigm = FullTextSearchParadigm()
assert paradigm.name == "FTS"
assert paradigm.category == "textual"
assert paradigm.complexity == "beginner"
assert "search" in paradigm.description.lower()
def test_validate_query(self):
"""Test FTS query validation."""
paradigm = FullTextSearchParadigm()
# Valid queries
valid, error = paradigm.validate_query("documentation")
assert valid
assert error is None
valid, error = paradigm.validate_query("API AND documentation")
assert valid
valid, error = paradigm.validate_query('"getting started"')
assert valid
# Invalid queries
valid, error = paradigm.validate_query("")
assert not valid
assert error is not None
def test_get_examples(self):
"""Test FTS paradigm examples."""
paradigm = FullTextSearchParadigm()
examples = paradigm.get_examples()
assert isinstance(examples, list)
assert len(examples) > 0
# Check for expected example types
example_names = [ex["name"] for ex in examples]
assert "Simple search" in example_names
assert "Boolean search" in example_names
class TestQueryByExampleParadigm:
"""Test the Query By Example paradigm (documentation-only)."""
def test_paradigm_properties(self):
"""Test QBE paradigm basic properties."""
paradigm = QueryByExampleParadigm()
assert paradigm.name == "Query By Example"
assert paradigm.category == "visual"
assert paradigm.complexity == "beginner"
assert "template" in paradigm.description.lower()
def test_validate_query(self):
"""Test QBE query validation."""
paradigm = QueryByExampleParadigm()
# Valid JSON templates
valid, error = paradigm.validate_query('{"author": "Alice"}')
assert valid
assert error is None
valid, error = paradigm.validate_query('{"tags": ["tutorial"], "type": "markdown"}')
assert valid
# Invalid queries
valid, error = paradigm.validate_query("")
assert not valid
assert error is not None
valid, error = paradigm.validate_query("not json")
assert not valid
assert "JSON" in error
valid, error = paradigm.validate_query('["not", "an", "object"]')
assert not valid
assert "object" in error
def test_execute_returns_not_implemented(self):
"""Test that QBE execution returns not implemented error."""
paradigm = QueryByExampleParadigm()
result = paradigm.execute('{"author": "Alice"}')
assert isinstance(result, QueryResult)
assert not result.success
assert result.error_message is not None
assert "not yet implemented" in result.error_message.lower()
assert result.metadata["status"] == "not_implemented"
def test_get_syntax_help(self):
"""Test QBE syntax help."""
paradigm = QueryByExampleParadigm()
help_text = paradigm.get_syntax_help()
assert isinstance(help_text, str)
assert len(help_text) > 0
assert "JSON" in help_text
assert "template" in help_text.lower()
class TestQueryResult:
"""Test the QueryResult data structure."""
def test_query_result_creation(self):
"""Test creating a QueryResult."""
result = QueryResult(
paradigm="Test",
query="test query",
execution_time_ms=10.5,
result_count=3,
results=[{"id": 1}, {"id": 2}, {"id": 3}],
metadata={"type": "test"},
success=True
)
assert result.paradigm == "Test"
assert result.query == "test query"
assert result.execution_time_ms == 10.5
assert result.result_count == 3
assert len(result.results) == 3
assert result.metadata["type"] == "test"
assert result.success is True
assert result.error_message is None
def test_query_result_with_error(self):
"""Test creating a QueryResult with error."""
result = QueryResult(
paradigm="Test",
query="bad query",
execution_time_ms=1.0,
result_count=0,
results=[],
metadata={},
success=False,
error_message="Query failed"
)
assert not result.success
assert result.error_message == "Query failed"
assert result.result_count == 0
class TestBaseQueryParadigm:
"""Test the base query paradigm interface."""
def test_cannot_instantiate_base_class(self):
"""Test that BaseQueryParadigm cannot be instantiated directly."""
with pytest.raises(TypeError):
BaseQueryParadigm()
def test_paradigm_interface(self):
"""Test that paradigms implement the required interface."""
paradigm = SQLQueryParadigm()
# Test all required properties
assert hasattr(paradigm, 'name')
assert hasattr(paradigm, 'description')
assert hasattr(paradigm, 'category')
assert hasattr(paradigm, 'complexity')
# Test all required methods
assert hasattr(paradigm, 'execute')
assert hasattr(paradigm, 'get_examples')
assert hasattr(paradigm, 'validate_query')
assert hasattr(paradigm, 'get_syntax_help')
# Test optional methods
assert hasattr(paradigm, 'can_translate_from')
assert hasattr(paradigm, 'translate_query')
if __name__ == "__main__":
pytest.main([__file__])