Files
markitect-main/markitect/database.py
tegwick 1840d0654d feat: Complete Issue #14 - Database Query CLI Interface MAJOR MILESTONE
Implement comprehensive database query interface with multiple output formats:

• Add query command for executing read-only SQL queries with security constraints
• Add schema command for database structure inspection
• Add metadata command for file information display
• Support table, JSON, and YAML output formats across all commands
• Implement SQL injection prevention and safety checks
• Add tabulate dependency for enhanced table formatting
• Create 35 comprehensive tests covering all functionality

This delivers the core USP "Relational Document Metadata" by making the
database fully queryable through CLI commands with multiple output formats.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-25 03:30:10 +02:00

260 lines
7.5 KiB
Python

"""
Database management functionality for MarkiTect.
This module provides SQLite database initialization and markdown file storage
with front matter support.
"""
import sqlite3
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any
from .frontmatter import FrontMatterParser
class DatabaseManager:
"""Manager for SQLite database operations."""
def __init__(self, db_path: str):
"""
Initialize database manager.
Args:
db_path: Path to SQLite database file
"""
self.db_path = db_path
self.front_matter_parser = FrontMatterParser()
def initialize_database(self) -> None:
"""
Initialize SQLite database with required tables.
Creates the markdown_files table with the following schema:
- id: INTEGER PRIMARY KEY
- filename: TEXT NOT NULL
- front_matter: TEXT (JSON)
- content: TEXT
- created_at: TIMESTAMP DEFAULT CURRENT_TIMESTAMP
"""
# Ensure directory exists
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create markdown_files table
cursor.execute('''
CREATE TABLE IF NOT EXISTS markdown_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
front_matter TEXT,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def store_markdown_file(self, filename: str, content: str) -> Optional[int]:
"""
Store a markdown file in the database.
Args:
filename: Name of the markdown file
content: Raw markdown content with optional front matter
Returns:
ID of the inserted record, or None if insertion failed
"""
# Parse front matter and content
front_matter, markdown_content = self.front_matter_parser.parse(content)
# Convert front matter to JSON string
front_matter_json = json.dumps(front_matter) if front_matter else '{}'
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO markdown_files (filename, front_matter, content, created_at)
VALUES (?, ?, ?, ?)
''', (filename, front_matter_json, markdown_content, datetime.now()))
record_id = cursor.lastrowid
conn.commit()
return record_id
except sqlite3.Error:
conn.rollback()
return None
finally:
conn.close()
def get_markdown_file(self, filename: str) -> Optional[Dict[str, Any]]:
"""
Retrieve a markdown file from the database.
Args:
filename: Name of the markdown file to retrieve
Returns:
Dictionary containing file data, or None if not found
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT id, filename, front_matter, content, created_at
FROM markdown_files
WHERE filename = ?
''', (filename,))
row = cursor.fetchone()
conn.close()
if row:
return {
'id': row[0],
'filename': row[1],
'front_matter': json.loads(row[2]) if row[2] else {},
'content': row[3],
'created_at': row[4]
}
return None
def list_markdown_files(self) -> list:
"""
List all markdown files in the database.
Returns:
List of dictionaries containing file metadata
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT id, filename, front_matter, created_at
FROM markdown_files
ORDER BY created_at DESC
''')
rows = cursor.fetchall()
conn.close()
files = []
for row in rows:
files.append({
'id': row[0],
'filename': row[1],
'front_matter': json.loads(row[2]) if row[2] else {},
'created_at': row[3]
})
return files
def execute_query(self, sql: str) -> list:
"""
Execute a read-only SQL query against the database.
Args:
sql: SQL query string (SELECT operations only)
Returns:
List of dictionaries representing query results
Raises:
ValueError: If query contains non-SELECT operations
sqlite3.Error: If query execution fails
"""
# Security check: only allow SELECT queries
sql_upper = sql.strip().upper()
if not sql_upper.startswith('SELECT'):
allowed_starts = ['SELECT', 'WITH'] # Allow WITH for CTEs
if not any(sql_upper.startswith(start) for start in allowed_starts):
raise ValueError("Only SELECT and WITH queries are allowed for safety")
# Additional safety checks for dangerous keywords (as whole words)
dangerous_keywords = [
'DROP', 'DELETE', 'UPDATE', 'INSERT', 'CREATE', 'ALTER',
'TRUNCATE', 'REPLACE', 'PRAGMA'
]
import re
for keyword in dangerous_keywords:
# Use word boundaries to match only complete words
pattern = r'\b' + keyword + r'\b'
if re.search(pattern, sql_upper):
raise ValueError(f"Query contains dangerous keyword: {keyword}")
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row # Enable column access by name
cursor = conn.cursor()
try:
cursor.execute(sql)
rows = cursor.fetchall()
# Convert rows to dictionaries
results = []
for row in rows:
results.append(dict(row))
conn.close()
return results
except sqlite3.Error as e:
conn.close()
raise e
def get_schema(self) -> dict:
"""
Get database schema information.
Returns:
Dictionary containing table schemas with column information
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
schema = {}
try:
# Get all table names
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
for table_row in tables:
table_name = table_row[0]
# Get column information for each table
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
column_info = []
for col in columns:
column_info.append({
'name': col[1],
'type': col[2],
'nullable': not bool(col[3]), # notnull flag
'default_value': col[4],
'primary_key': bool(col[5])
})
schema[table_name] = {
'columns': column_info
}
conn.close()
return schema
except sqlite3.Error as e:
conn.close()
raise e