Phase 0 - Project Organization: - Create docs/PROJECT_STRUCTURE.md documenting codebase layout - Create markitect/core/ with parser, serializer, document_manager, workspace - Create markitect/schema/ consolidating 6 schema_*.py modules - Create markitect/storage/ with database module - Maintain backward compatibility via re-exports from original locations - Add docs/roadmap/information-space-service/ with README and WORKPLAN Phase 1 - Foundation (Weeks 1-3): - Week 1: Core domain models (InformationSpace, SpaceDocument, SpaceConfig, SpaceMetadata, SpaceVariable, TransclusionReference, SpaceStatus) - Week 2: Repository layer with interfaces (ISpaceRepository, IDocumentAssociationRepository, IVariableRepository, IReferenceRepository) and SQLite implementations with foreign key cascade deletes - Week 3: SpaceService orchestration layer with full CRUD, document, variable, and reference tracking operations Test coverage: 124 tests (25 model + 63 repository + 36 integration) Capabilities delivered: - CAP-001: InformationSpace entity with lifecycle management - CAP-002: SpaceRepository CRUD with SQLite backing - CAP-003: Document-Space associations with path-based organization - CAP-004: Space metadata and configuration schemas - CAP-005: Database schema with migrations Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
445 lines
13 KiB
Python
445 lines
13 KiB
Python
"""
|
|
Database management functionality for MarkiTect.
|
|
|
|
This module provides SQLite database initialization, markdown file storage
|
|
with front matter support, and JSON schema storage (Issue #3).
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any
|
|
|
|
from markitect.frontmatter import FrontMatterParser
|
|
|
|
|
|
class DatabaseManager:
|
|
"""Manager for SQLite database operations."""
|
|
|
|
def __init__(self, db_path: str):
|
|
"""
|
|
Initialize database manager.
|
|
|
|
Args:
|
|
db_path: Path to SQLite database file
|
|
"""
|
|
self.db_path = db_path
|
|
self.front_matter_parser = FrontMatterParser()
|
|
|
|
def initialize_database(self) -> None:
|
|
"""
|
|
Initialize SQLite database with required tables.
|
|
|
|
Creates the markdown_files table with the following schema:
|
|
- id: INTEGER PRIMARY KEY
|
|
- filename: TEXT NOT NULL
|
|
- front_matter: TEXT (JSON)
|
|
- content: TEXT
|
|
- created_at: TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
|
Also initializes finance schema if finance module is available.
|
|
"""
|
|
# Ensure directory exists
|
|
db_dir = os.path.dirname(self.db_path)
|
|
if db_dir and not os.path.exists(db_dir):
|
|
os.makedirs(db_dir)
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Create markdown_files table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS markdown_files (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
filename TEXT NOT NULL,
|
|
front_matter TEXT,
|
|
content TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
# Create schemas table for Issue #3
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS schemas (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
filename TEXT NOT NULL UNIQUE,
|
|
title TEXT,
|
|
description TEXT,
|
|
schema_content TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Initialize finance schema if available
|
|
self.initialize_finance_schema()
|
|
|
|
def initialize_finance_schema(self) -> None:
|
|
"""
|
|
Initialize finance schema for cost tracking (Issue #88).
|
|
|
|
This method is called automatically during database initialization
|
|
to set up cost tracking tables if the finance module is available.
|
|
"""
|
|
try:
|
|
from .finance.models import FinanceModels
|
|
finance_models = FinanceModels(self.db_path)
|
|
finance_models.initialize_finance_schema()
|
|
except ImportError:
|
|
# Finance module not available, skip initialization
|
|
pass
|
|
except Exception as e:
|
|
# Silently ignore finance schema initialization errors for CLI compatibility
|
|
pass
|
|
|
|
def store_markdown_file(self, filename: str, content: str) -> Optional[int]:
|
|
"""
|
|
Store a markdown file in the database.
|
|
|
|
Args:
|
|
filename: Name of the markdown file
|
|
content: Raw markdown content with optional front matter
|
|
|
|
Returns:
|
|
ID of the inserted record, or None if insertion failed
|
|
"""
|
|
# Parse front matter and content
|
|
front_matter, markdown_content = self.front_matter_parser.parse(content)
|
|
|
|
# Convert front matter to JSON string
|
|
front_matter_json = json.dumps(front_matter) if front_matter else '{}'
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute('''
|
|
INSERT INTO markdown_files (filename, front_matter, content, created_at)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (filename, front_matter_json, markdown_content, datetime.now().isoformat()))
|
|
|
|
record_id = cursor.lastrowid
|
|
conn.commit()
|
|
return record_id
|
|
|
|
except sqlite3.Error:
|
|
conn.rollback()
|
|
return None
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_markdown_file(self, filename: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Retrieve a markdown file from the database.
|
|
|
|
Args:
|
|
filename: Name of the markdown file to retrieve
|
|
|
|
Returns:
|
|
Dictionary containing file data, or None if not found
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT id, filename, front_matter, content, created_at
|
|
FROM markdown_files
|
|
WHERE filename = ?
|
|
''', (filename,))
|
|
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if row:
|
|
return {
|
|
'id': row[0],
|
|
'filename': row[1],
|
|
'front_matter': json.loads(row[2]) if row[2] else {},
|
|
'content': row[3],
|
|
'created_at': row[4]
|
|
}
|
|
|
|
return None
|
|
|
|
def list_markdown_files(self) -> list:
|
|
"""
|
|
List all markdown files in the database.
|
|
|
|
Returns:
|
|
List of dictionaries containing file metadata
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT id, filename, front_matter, created_at
|
|
FROM markdown_files
|
|
ORDER BY created_at DESC
|
|
''')
|
|
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
|
|
files = []
|
|
for row in rows:
|
|
files.append({
|
|
'id': row[0],
|
|
'filename': row[1],
|
|
'front_matter': json.loads(row[2]) if row[2] else {},
|
|
'created_at': row[3]
|
|
})
|
|
|
|
return files
|
|
|
|
def execute_query(self, sql: str) -> list:
|
|
"""
|
|
Execute a read-only SQL query against the database.
|
|
|
|
Args:
|
|
sql: SQL query string (SELECT operations only)
|
|
|
|
Returns:
|
|
List of dictionaries representing query results
|
|
|
|
Raises:
|
|
ValueError: If query contains non-SELECT operations
|
|
sqlite3.Error: If query execution fails
|
|
"""
|
|
# Security check: only allow SELECT queries
|
|
sql_upper = sql.strip().upper()
|
|
if not sql_upper.startswith('SELECT'):
|
|
allowed_starts = ['SELECT', 'WITH'] # Allow WITH for CTEs
|
|
if not any(sql_upper.startswith(start) for start in allowed_starts):
|
|
raise ValueError("Only SELECT and WITH queries are allowed for safety")
|
|
|
|
# Additional safety checks for dangerous keywords (as whole words)
|
|
dangerous_keywords = [
|
|
'DROP', 'DELETE', 'UPDATE', 'INSERT', 'CREATE', 'ALTER',
|
|
'TRUNCATE', 'REPLACE', 'PRAGMA'
|
|
]
|
|
import re
|
|
for keyword in dangerous_keywords:
|
|
# Use word boundaries to match only complete words
|
|
pattern = r'\b' + keyword + r'\b'
|
|
if re.search(pattern, sql_upper):
|
|
raise ValueError(f"Query contains dangerous keyword: {keyword}")
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.row_factory = sqlite3.Row # Enable column access by name
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute(sql)
|
|
rows = cursor.fetchall()
|
|
|
|
# Convert rows to dictionaries
|
|
results = []
|
|
for row in rows:
|
|
results.append(dict(row))
|
|
|
|
conn.close()
|
|
return results
|
|
|
|
except sqlite3.Error as e:
|
|
conn.close()
|
|
raise e
|
|
|
|
def get_schema(self) -> dict:
|
|
"""
|
|
Get database schema information.
|
|
|
|
Returns:
|
|
Dictionary containing table schemas with column information
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
schema = {}
|
|
|
|
try:
|
|
# Get all table names
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
tables = cursor.fetchall()
|
|
|
|
for table_row in tables:
|
|
table_name = table_row[0]
|
|
|
|
# Get column information for each table
|
|
cursor.execute(f"PRAGMA table_info({table_name})")
|
|
columns = cursor.fetchall()
|
|
|
|
column_info = []
|
|
for col in columns:
|
|
column_info.append({
|
|
'name': col[1],
|
|
'type': col[2],
|
|
'nullable': not bool(col[3]), # notnull flag
|
|
'default_value': col[4],
|
|
'primary_key': bool(col[5])
|
|
})
|
|
|
|
schema[table_name] = {
|
|
'columns': column_info
|
|
}
|
|
|
|
conn.close()
|
|
return schema
|
|
|
|
except sqlite3.Error as e:
|
|
conn.close()
|
|
raise e
|
|
|
|
# Schema management methods for Issue #3
|
|
def store_schema_file(self, filename: str, schema_content: str) -> Optional[int]:
|
|
"""
|
|
Store a JSON schema file in the database.
|
|
|
|
Args:
|
|
filename: Name of the schema file
|
|
schema_content: JSON schema content as string
|
|
|
|
Returns:
|
|
ID of the inserted/updated record, or None if operation failed
|
|
"""
|
|
try:
|
|
# Parse and validate JSON schema
|
|
schema_data = json.loads(schema_content)
|
|
title = schema_data.get('title', filename)
|
|
description = schema_data.get('description', '')
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Check if schema already exists
|
|
cursor.execute('SELECT id FROM schemas WHERE filename = ?', (filename,))
|
|
existing = cursor.fetchone()
|
|
|
|
if existing:
|
|
# Update existing schema
|
|
cursor.execute('''
|
|
UPDATE schemas
|
|
SET title = ?, description = ?, schema_content = ?, updated_at = ?
|
|
WHERE filename = ?
|
|
''', (title, description, schema_content, datetime.now().isoformat(), filename))
|
|
record_id = existing[0]
|
|
else:
|
|
# Insert new schema
|
|
cursor.execute('''
|
|
INSERT INTO schemas (filename, title, description, schema_content, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
''', (filename, title, description, schema_content,
|
|
datetime.now().isoformat(), datetime.now().isoformat()))
|
|
record_id = cursor.lastrowid
|
|
|
|
conn.commit()
|
|
return record_id
|
|
|
|
except sqlite3.Error:
|
|
conn.rollback()
|
|
return None
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_schema_file(self, filename: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Retrieve a schema file from the database.
|
|
|
|
Args:
|
|
filename: Name of the schema file to retrieve
|
|
|
|
Returns:
|
|
Dictionary containing schema data, or None if not found
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT id, filename, title, description, schema_content, created_at, updated_at
|
|
FROM schemas
|
|
WHERE filename = ?
|
|
''', (filename,))
|
|
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if row:
|
|
return {
|
|
'id': row[0],
|
|
'filename': row[1],
|
|
'title': row[2],
|
|
'description': row[3],
|
|
'schema_content': row[4],
|
|
'created_at': row[5],
|
|
'updated_at': row[6]
|
|
}
|
|
|
|
return None
|
|
|
|
def list_schema_files(self) -> list:
|
|
"""
|
|
List all schema files in the database.
|
|
|
|
Returns:
|
|
List of dictionaries containing schema metadata
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT id, filename, title, description, created_at, updated_at
|
|
FROM schemas
|
|
ORDER BY updated_at DESC
|
|
''')
|
|
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
|
|
schemas = []
|
|
for row in rows:
|
|
schemas.append({
|
|
'id': row[0],
|
|
'filename': row[1],
|
|
'title': row[2],
|
|
'description': row[3],
|
|
'created_at': row[4],
|
|
'updated_at': row[5]
|
|
})
|
|
|
|
return schemas
|
|
|
|
def delete_schema_file(self, filename: str) -> bool:
|
|
"""
|
|
Delete a schema file from the database.
|
|
|
|
Args:
|
|
filename: Name of the schema file to delete
|
|
|
|
Returns:
|
|
True if deletion was successful, False otherwise
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute('DELETE FROM schemas WHERE filename = ?', (filename,))
|
|
success = cursor.rowcount > 0
|
|
conn.commit()
|
|
return success
|
|
|
|
except sqlite3.Error:
|
|
conn.rollback()
|
|
return False
|
|
|
|
finally:
|
|
conn.close()
|