Implement comprehensive database query interface with multiple output formats: • Add query command for executing read-only SQL queries with security constraints • Add schema command for database structure inspection • Add metadata command for file information display • Support table, JSON, and YAML output formats across all commands • Implement SQL injection prevention and safety checks • Add tabulate dependency for enhanced table formatting • Create 35 comprehensive tests covering all functionality This delivers the core USP "Relational Document Metadata" by making the database fully queryable through CLI commands with multiple output formats. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
260 lines
7.5 KiB
Python
260 lines
7.5 KiB
Python
"""
|
|
Database management functionality for MarkiTect.
|
|
|
|
This module provides SQLite database initialization and markdown file storage
|
|
with front matter support.
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any
|
|
|
|
from .frontmatter import FrontMatterParser
|
|
|
|
|
|
class DatabaseManager:
|
|
"""Manager for SQLite database operations."""
|
|
|
|
def __init__(self, db_path: str):
|
|
"""
|
|
Initialize database manager.
|
|
|
|
Args:
|
|
db_path: Path to SQLite database file
|
|
"""
|
|
self.db_path = db_path
|
|
self.front_matter_parser = FrontMatterParser()
|
|
|
|
def initialize_database(self) -> None:
|
|
"""
|
|
Initialize SQLite database with required tables.
|
|
|
|
Creates the markdown_files table with the following schema:
|
|
- id: INTEGER PRIMARY KEY
|
|
- filename: TEXT NOT NULL
|
|
- front_matter: TEXT (JSON)
|
|
- content: TEXT
|
|
- created_at: TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
"""
|
|
# Ensure directory exists
|
|
db_dir = os.path.dirname(self.db_path)
|
|
if db_dir and not os.path.exists(db_dir):
|
|
os.makedirs(db_dir)
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Create markdown_files table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS markdown_files (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
filename TEXT NOT NULL,
|
|
front_matter TEXT,
|
|
content TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def store_markdown_file(self, filename: str, content: str) -> Optional[int]:
|
|
"""
|
|
Store a markdown file in the database.
|
|
|
|
Args:
|
|
filename: Name of the markdown file
|
|
content: Raw markdown content with optional front matter
|
|
|
|
Returns:
|
|
ID of the inserted record, or None if insertion failed
|
|
"""
|
|
# Parse front matter and content
|
|
front_matter, markdown_content = self.front_matter_parser.parse(content)
|
|
|
|
# Convert front matter to JSON string
|
|
front_matter_json = json.dumps(front_matter) if front_matter else '{}'
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute('''
|
|
INSERT INTO markdown_files (filename, front_matter, content, created_at)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (filename, front_matter_json, markdown_content, datetime.now()))
|
|
|
|
record_id = cursor.lastrowid
|
|
conn.commit()
|
|
return record_id
|
|
|
|
except sqlite3.Error:
|
|
conn.rollback()
|
|
return None
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_markdown_file(self, filename: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Retrieve a markdown file from the database.
|
|
|
|
Args:
|
|
filename: Name of the markdown file to retrieve
|
|
|
|
Returns:
|
|
Dictionary containing file data, or None if not found
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT id, filename, front_matter, content, created_at
|
|
FROM markdown_files
|
|
WHERE filename = ?
|
|
''', (filename,))
|
|
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if row:
|
|
return {
|
|
'id': row[0],
|
|
'filename': row[1],
|
|
'front_matter': json.loads(row[2]) if row[2] else {},
|
|
'content': row[3],
|
|
'created_at': row[4]
|
|
}
|
|
|
|
return None
|
|
|
|
def list_markdown_files(self) -> list:
|
|
"""
|
|
List all markdown files in the database.
|
|
|
|
Returns:
|
|
List of dictionaries containing file metadata
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT id, filename, front_matter, created_at
|
|
FROM markdown_files
|
|
ORDER BY created_at DESC
|
|
''')
|
|
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
|
|
files = []
|
|
for row in rows:
|
|
files.append({
|
|
'id': row[0],
|
|
'filename': row[1],
|
|
'front_matter': json.loads(row[2]) if row[2] else {},
|
|
'created_at': row[3]
|
|
})
|
|
|
|
return files
|
|
|
|
def execute_query(self, sql: str) -> list:
|
|
"""
|
|
Execute a read-only SQL query against the database.
|
|
|
|
Args:
|
|
sql: SQL query string (SELECT operations only)
|
|
|
|
Returns:
|
|
List of dictionaries representing query results
|
|
|
|
Raises:
|
|
ValueError: If query contains non-SELECT operations
|
|
sqlite3.Error: If query execution fails
|
|
"""
|
|
# Security check: only allow SELECT queries
|
|
sql_upper = sql.strip().upper()
|
|
if not sql_upper.startswith('SELECT'):
|
|
allowed_starts = ['SELECT', 'WITH'] # Allow WITH for CTEs
|
|
if not any(sql_upper.startswith(start) for start in allowed_starts):
|
|
raise ValueError("Only SELECT and WITH queries are allowed for safety")
|
|
|
|
# Additional safety checks for dangerous keywords (as whole words)
|
|
dangerous_keywords = [
|
|
'DROP', 'DELETE', 'UPDATE', 'INSERT', 'CREATE', 'ALTER',
|
|
'TRUNCATE', 'REPLACE', 'PRAGMA'
|
|
]
|
|
import re
|
|
for keyword in dangerous_keywords:
|
|
# Use word boundaries to match only complete words
|
|
pattern = r'\b' + keyword + r'\b'
|
|
if re.search(pattern, sql_upper):
|
|
raise ValueError(f"Query contains dangerous keyword: {keyword}")
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.row_factory = sqlite3.Row # Enable column access by name
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute(sql)
|
|
rows = cursor.fetchall()
|
|
|
|
# Convert rows to dictionaries
|
|
results = []
|
|
for row in rows:
|
|
results.append(dict(row))
|
|
|
|
conn.close()
|
|
return results
|
|
|
|
except sqlite3.Error as e:
|
|
conn.close()
|
|
raise e
|
|
|
|
def get_schema(self) -> dict:
|
|
"""
|
|
Get database schema information.
|
|
|
|
Returns:
|
|
Dictionary containing table schemas with column information
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
schema = {}
|
|
|
|
try:
|
|
# Get all table names
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
tables = cursor.fetchall()
|
|
|
|
for table_row in tables:
|
|
table_name = table_row[0]
|
|
|
|
# Get column information for each table
|
|
cursor.execute(f"PRAGMA table_info({table_name})")
|
|
columns = cursor.fetchall()
|
|
|
|
column_info = []
|
|
for col in columns:
|
|
column_info.append({
|
|
'name': col[1],
|
|
'type': col[2],
|
|
'nullable': not bool(col[3]), # notnull flag
|
|
'default_value': col[4],
|
|
'primary_key': bool(col[5])
|
|
})
|
|
|
|
schema[table_name] = {
|
|
'columns': column_info
|
|
}
|
|
|
|
conn.close()
|
|
return schema
|
|
|
|
except sqlite3.Error as e:
|
|
conn.close()
|
|
raise e |