""" Database management functionality for MarkiTect. This module provides SQLite database initialization, markdown file storage with front matter support, and JSON schema storage (Issue #3). """ import sqlite3 import json import os from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any from .frontmatter import FrontMatterParser class DatabaseManager: """Manager for SQLite database operations.""" def __init__(self, db_path: str): """ Initialize database manager. Args: db_path: Path to SQLite database file """ self.db_path = db_path self.front_matter_parser = FrontMatterParser() def initialize_database(self) -> None: """ Initialize SQLite database with required tables. Creates the markdown_files table with the following schema: - id: INTEGER PRIMARY KEY - filename: TEXT NOT NULL - front_matter: TEXT (JSON) - content: TEXT - created_at: TIMESTAMP DEFAULT CURRENT_TIMESTAMP Also initializes finance schema if finance module is available. """ # Ensure directory exists db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create markdown_files table cursor.execute(''' CREATE TABLE IF NOT EXISTS markdown_files ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT NOT NULL, front_matter TEXT, content TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create schemas table for Issue #3 cursor.execute(''' CREATE TABLE IF NOT EXISTS schemas ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT NOT NULL UNIQUE, title TEXT, description TEXT, schema_content TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() # Initialize finance schema if available self.initialize_finance_schema() def initialize_finance_schema(self) -> None: """ Initialize finance schema for cost tracking (Issue #88). This method is called automatically during database initialization to set up cost tracking tables if the finance module is available. """ try: from .finance.models import FinanceModels finance_models = FinanceModels(self.db_path) finance_models.initialize_finance_schema() except ImportError: # Finance module not available, skip initialization pass except Exception as e: # Silently ignore finance schema initialization errors for CLI compatibility pass def store_markdown_file(self, filename: str, content: str) -> Optional[int]: """ Store a markdown file in the database. Args: filename: Name of the markdown file content: Raw markdown content with optional front matter Returns: ID of the inserted record, or None if insertion failed """ # Parse front matter and content front_matter, markdown_content = self.front_matter_parser.parse(content) # Convert front matter to JSON string front_matter_json = json.dumps(front_matter) if front_matter else '{}' conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(''' INSERT INTO markdown_files (filename, front_matter, content, created_at) VALUES (?, ?, ?, ?) ''', (filename, front_matter_json, markdown_content, datetime.now().isoformat())) record_id = cursor.lastrowid conn.commit() return record_id except sqlite3.Error: conn.rollback() return None finally: conn.close() def get_markdown_file(self, filename: str) -> Optional[Dict[str, Any]]: """ Retrieve a markdown file from the database. Args: filename: Name of the markdown file to retrieve Returns: Dictionary containing file data, or None if not found """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT id, filename, front_matter, content, created_at FROM markdown_files WHERE filename = ? ''', (filename,)) row = cursor.fetchone() conn.close() if row: return { 'id': row[0], 'filename': row[1], 'front_matter': json.loads(row[2]) if row[2] else {}, 'content': row[3], 'created_at': row[4] } return None def list_markdown_files(self) -> list: """ List all markdown files in the database. Returns: List of dictionaries containing file metadata """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT id, filename, front_matter, created_at FROM markdown_files ORDER BY created_at DESC ''') rows = cursor.fetchall() conn.close() files = [] for row in rows: files.append({ 'id': row[0], 'filename': row[1], 'front_matter': json.loads(row[2]) if row[2] else {}, 'created_at': row[3] }) return files def execute_query(self, sql: str) -> list: """ Execute a read-only SQL query against the database. Args: sql: SQL query string (SELECT operations only) Returns: List of dictionaries representing query results Raises: ValueError: If query contains non-SELECT operations sqlite3.Error: If query execution fails """ # Security check: only allow SELECT queries sql_upper = sql.strip().upper() if not sql_upper.startswith('SELECT'): allowed_starts = ['SELECT', 'WITH'] # Allow WITH for CTEs if not any(sql_upper.startswith(start) for start in allowed_starts): raise ValueError("Only SELECT and WITH queries are allowed for safety") # Additional safety checks for dangerous keywords (as whole words) dangerous_keywords = [ 'DROP', 'DELETE', 'UPDATE', 'INSERT', 'CREATE', 'ALTER', 'TRUNCATE', 'REPLACE', 'PRAGMA' ] import re for keyword in dangerous_keywords: # Use word boundaries to match only complete words pattern = r'\b' + keyword + r'\b' if re.search(pattern, sql_upper): raise ValueError(f"Query contains dangerous keyword: {keyword}") conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row # Enable column access by name cursor = conn.cursor() try: cursor.execute(sql) rows = cursor.fetchall() # Convert rows to dictionaries results = [] for row in rows: results.append(dict(row)) conn.close() return results except sqlite3.Error as e: conn.close() raise e def get_schema(self) -> dict: """ Get database schema information. Returns: Dictionary containing table schemas with column information """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() schema = {} try: # Get all table names cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = cursor.fetchall() for table_row in tables: table_name = table_row[0] # Get column information for each table cursor.execute(f"PRAGMA table_info({table_name})") columns = cursor.fetchall() column_info = [] for col in columns: column_info.append({ 'name': col[1], 'type': col[2], 'nullable': not bool(col[3]), # notnull flag 'default_value': col[4], 'primary_key': bool(col[5]) }) schema[table_name] = { 'columns': column_info } conn.close() return schema except sqlite3.Error as e: conn.close() raise e # Schema management methods for Issue #3 def store_schema_file(self, filename: str, schema_content: str) -> Optional[int]: """ Store a JSON schema file in the database. Args: filename: Name of the schema file schema_content: JSON schema content as string Returns: ID of the inserted/updated record, or None if operation failed """ try: # Parse and validate JSON schema schema_data = json.loads(schema_content) title = schema_data.get('title', filename) description = schema_data.get('description', '') except json.JSONDecodeError: return None conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: # Check if schema already exists cursor.execute('SELECT id FROM schemas WHERE filename = ?', (filename,)) existing = cursor.fetchone() if existing: # Update existing schema cursor.execute(''' UPDATE schemas SET title = ?, description = ?, schema_content = ?, updated_at = ? WHERE filename = ? ''', (title, description, schema_content, datetime.now().isoformat(), filename)) record_id = existing[0] else: # Insert new schema cursor.execute(''' INSERT INTO schemas (filename, title, description, schema_content, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) ''', (filename, title, description, schema_content, datetime.now().isoformat(), datetime.now().isoformat())) record_id = cursor.lastrowid conn.commit() return record_id except sqlite3.Error: conn.rollback() return None finally: conn.close() def get_schema_file(self, filename: str) -> Optional[Dict[str, Any]]: """ Retrieve a schema file from the database. Args: filename: Name of the schema file to retrieve Returns: Dictionary containing schema data, or None if not found """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT id, filename, title, description, schema_content, created_at, updated_at FROM schemas WHERE filename = ? ''', (filename,)) row = cursor.fetchone() conn.close() if row: return { 'id': row[0], 'filename': row[1], 'title': row[2], 'description': row[3], 'schema_content': row[4], 'created_at': row[5], 'updated_at': row[6] } return None def list_schema_files(self) -> list: """ List all schema files in the database. Returns: List of dictionaries containing schema metadata """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT id, filename, title, description, created_at, updated_at FROM schemas ORDER BY updated_at DESC ''') rows = cursor.fetchall() conn.close() schemas = [] for row in rows: schemas.append({ 'id': row[0], 'filename': row[1], 'title': row[2], 'description': row[3], 'created_at': row[4], 'updated_at': row[5] }) return schemas def delete_schema_file(self, filename: str) -> bool: """ Delete a schema file from the database. Args: filename: Name of the schema file to delete Returns: True if deletion was successful, False otherwise """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute('DELETE FROM schemas WHERE filename = ?', (filename,)) success = cursor.rowcount > 0 conn.commit() return success except sqlite3.Error: conn.rollback() return False finally: conn.close()