Files
markitect-main/markitect/database.py
tegwick 92fa0e9151 fix: Resolve Python 3.12 SQLite datetime adapter deprecation warnings
Fixed the massive number of deprecation warnings generated during test runs
by updating datetime handling in SQLite operations to use ISO format strings
instead of raw datetime objects.

## Problem
- Tests were generating 63+ deprecation warnings per run
- Python 3.12 deprecated the default datetime adapter for SQLite
- Warning: "The default datetime adapter is deprecated as of Python 3.12"

## Solution
- Convert datetime.now() to datetime.now().isoformat() in SQL INSERT
- Uses ISO format strings that SQLite handles natively
- Eliminates dependency on deprecated datetime adapter

## Impact
 Zero deprecation warnings in test runs
 All existing functionality preserved
 Database compatibility maintained
 Clean test output for better debugging

## Files Changed
- markitect/database.py: Updated store_markdown_file() method

This fix improves the development experience by eliminating the flood
of warnings that were obscuring actual test output and issues.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-27 09:40:36 +02:00

260 lines
7.5 KiB
Python

"""
Database management functionality for MarkiTect.
This module provides SQLite database initialization and markdown file storage
with front matter support.
"""
import sqlite3
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any
from .frontmatter import FrontMatterParser
class DatabaseManager:
"""Manager for SQLite database operations."""
def __init__(self, db_path: str):
"""
Initialize database manager.
Args:
db_path: Path to SQLite database file
"""
self.db_path = db_path
self.front_matter_parser = FrontMatterParser()
def initialize_database(self) -> None:
"""
Initialize SQLite database with required tables.
Creates the markdown_files table with the following schema:
- id: INTEGER PRIMARY KEY
- filename: TEXT NOT NULL
- front_matter: TEXT (JSON)
- content: TEXT
- created_at: TIMESTAMP DEFAULT CURRENT_TIMESTAMP
"""
# Ensure directory exists
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create markdown_files table
cursor.execute('''
CREATE TABLE IF NOT EXISTS markdown_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
front_matter TEXT,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def store_markdown_file(self, filename: str, content: str) -> Optional[int]:
"""
Store a markdown file in the database.
Args:
filename: Name of the markdown file
content: Raw markdown content with optional front matter
Returns:
ID of the inserted record, or None if insertion failed
"""
# Parse front matter and content
front_matter, markdown_content = self.front_matter_parser.parse(content)
# Convert front matter to JSON string
front_matter_json = json.dumps(front_matter) if front_matter else '{}'
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO markdown_files (filename, front_matter, content, created_at)
VALUES (?, ?, ?, ?)
''', (filename, front_matter_json, markdown_content, datetime.now().isoformat()))
record_id = cursor.lastrowid
conn.commit()
return record_id
except sqlite3.Error:
conn.rollback()
return None
finally:
conn.close()
def get_markdown_file(self, filename: str) -> Optional[Dict[str, Any]]:
"""
Retrieve a markdown file from the database.
Args:
filename: Name of the markdown file to retrieve
Returns:
Dictionary containing file data, or None if not found
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT id, filename, front_matter, content, created_at
FROM markdown_files
WHERE filename = ?
''', (filename,))
row = cursor.fetchone()
conn.close()
if row:
return {
'id': row[0],
'filename': row[1],
'front_matter': json.loads(row[2]) if row[2] else {},
'content': row[3],
'created_at': row[4]
}
return None
def list_markdown_files(self) -> list:
"""
List all markdown files in the database.
Returns:
List of dictionaries containing file metadata
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT id, filename, front_matter, created_at
FROM markdown_files
ORDER BY created_at DESC
''')
rows = cursor.fetchall()
conn.close()
files = []
for row in rows:
files.append({
'id': row[0],
'filename': row[1],
'front_matter': json.loads(row[2]) if row[2] else {},
'created_at': row[3]
})
return files
def execute_query(self, sql: str) -> list:
"""
Execute a read-only SQL query against the database.
Args:
sql: SQL query string (SELECT operations only)
Returns:
List of dictionaries representing query results
Raises:
ValueError: If query contains non-SELECT operations
sqlite3.Error: If query execution fails
"""
# Security check: only allow SELECT queries
sql_upper = sql.strip().upper()
if not sql_upper.startswith('SELECT'):
allowed_starts = ['SELECT', 'WITH'] # Allow WITH for CTEs
if not any(sql_upper.startswith(start) for start in allowed_starts):
raise ValueError("Only SELECT and WITH queries are allowed for safety")
# Additional safety checks for dangerous keywords (as whole words)
dangerous_keywords = [
'DROP', 'DELETE', 'UPDATE', 'INSERT', 'CREATE', 'ALTER',
'TRUNCATE', 'REPLACE', 'PRAGMA'
]
import re
for keyword in dangerous_keywords:
# Use word boundaries to match only complete words
pattern = r'\b' + keyword + r'\b'
if re.search(pattern, sql_upper):
raise ValueError(f"Query contains dangerous keyword: {keyword}")
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row # Enable column access by name
cursor = conn.cursor()
try:
cursor.execute(sql)
rows = cursor.fetchall()
# Convert rows to dictionaries
results = []
for row in rows:
results.append(dict(row))
conn.close()
return results
except sqlite3.Error as e:
conn.close()
raise e
def get_schema(self) -> dict:
"""
Get database schema information.
Returns:
Dictionary containing table schemas with column information
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
schema = {}
try:
# Get all table names
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
for table_row in tables:
table_name = table_row[0]
# Get column information for each table
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
column_info = []
for col in columns:
column_info.append({
'name': col[1],
'type': col[2],
'nullable': not bool(col[3]), # notnull flag
'default_value': col[4],
'primary_key': bool(col[5])
})
schema[table_name] = {
'columns': column_info
}
conn.close()
return schema
except sqlite3.Error as e:
conn.close()
raise e