Files
markitect-main/infrastructure/repositories/sqlite_repository.py
tegwick 1fa0f1e84a
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
fix: Eliminate all 111 test warnings by fixing root causes
- Replace deprecated datetime.utcnow() with datetime.now(timezone.utc)
  across all domain models, services, infrastructure, and test files
- Add missing timezone imports to all affected files
- Fix pytest.ini configuration format from [tool:pytest] to [pytest]
- Remove warning suppressions to expose actual issues
- Ensure proper pytest marker registration for smoke tests

Results:
- 305 passed, 2 skipped, 0 warnings (down from 111 warnings)
- All functionality preserved with modern datetime API usage
- Improved code quality by addressing root causes vs suppression

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-27 20:14:22 +02:00

677 lines
24 KiB
Python

"""
SQLite repository implementation with transaction support.
Provides efficient database operations with connection pooling,
transaction management, and proper error handling.
"""
import sqlite3
import json
import uuid
from infrastructure.logging import get_logger
from typing import List, Optional, Dict, Any
from datetime import datetime, timezone
from pathlib import Path
from contextlib import asynccontextmanager
from infrastructure.repositories.interfaces import DocumentRepository, CacheRepository
from infrastructure.connection_manager import ConnectionManager
from infrastructure.exceptions import (
ErrorContext, OperationType, DatabaseError, ConnectionError,
ResourceNotFoundError, DuplicateResourceError, ValidationError,
TransactionError, QueryError
)
logger = get_logger(__name__)
class SqliteDocumentRepository(DocumentRepository):
"""
SQLite implementation of DocumentRepository with transaction support.
Provides efficient document storage and retrieval with proper
transaction handling and optimized database operations.
"""
def __init__(self, connection_manager: ConnectionManager):
self.connection_manager = connection_manager
self._initialize_schema()
def _initialize_schema(self):
"""Initialize database schema for documents."""
try:
conn = self.connection_manager.get_database_connection()
# Create documents table
conn.execute("""
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
content TEXT NOT NULL,
ast_json TEXT NOT NULL,
content_hash TEXT NOT NULL,
file_size INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(filename, content_hash)
)
""")
# Create cache table
conn.execute("""
CREATE TABLE IF NOT EXISTS ast_cache (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
cache_path TEXT NOT NULL,
cache_size INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents (id) ON DELETE CASCADE
)
""")
# Create indexes for performance
conn.execute("CREATE INDEX IF NOT EXISTS idx_documents_filename ON documents(filename)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_documents_created_at ON documents(created_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_cache_document_id ON ast_cache(document_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_cache_accessed_at ON ast_cache(accessed_at)")
conn.commit()
logger.info("Database schema initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize database schema: {e}")
raise ConnectionError("markitect.db", e)
async def store_document(
self,
filename: str,
content: str,
ast: Dict[str, Any],
context: Optional[ErrorContext] = None
) -> str:
"""Store a document with its AST representation."""
if context is None:
context = ErrorContext(
operation_id=f"store_document_{filename}",
operation_type=OperationType.WRITE,
resource_type="Document",
request_data={
"filename": filename,
"content_length": len(content),
"ast_keys": list(ast.keys()) if ast else []
}
)
# Validate input
if not filename or not filename.strip():
raise ValidationError("filename", filename, "Filename cannot be empty", context)
if not content:
raise ValidationError("content", content, "Content cannot be empty", context)
if not ast:
raise ValidationError("ast", ast, "AST cannot be empty", context)
try:
async with self.connection_manager.transaction() as conn:
# Generate unique document ID
document_id = str(uuid.uuid4())
# Calculate content hash for deduplication
import hashlib
content_hash = hashlib.sha256(content.encode()).hexdigest()
# Check for duplicate content
cursor = conn.execute(
"SELECT id FROM documents WHERE filename = ? AND content_hash = ?",
(filename, content_hash)
)
existing = cursor.fetchone()
if existing:
raise DuplicateResourceError("Document", filename, context)
# Store document
ast_json = json.dumps(ast)
file_size = len(content)
now = datetime.now(timezone.utc).isoformat()
conn.execute("""
INSERT INTO documents (id, filename, content, ast_json, content_hash, file_size, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (document_id, filename, content, ast_json, content_hash, file_size, now, now))
logger.info(f"Stored document {filename} with ID {document_id}")
return document_id
except sqlite3.IntegrityError as e:
if "UNIQUE constraint failed" in str(e):
raise DuplicateResourceError("Document", filename, context)
else:
raise DatabaseError(f"Integrity error storing document {filename}", e, context)
except Exception as e:
logger.error(f"Error storing document {filename}: {e}")
raise TransactionError(f"store document {filename}", e, context)
async def get_document(
self,
document_id: str,
context: Optional[ErrorContext] = None
) -> Dict[str, Any]:
"""Retrieve a document by its ID."""
if context is None:
context = ErrorContext(
operation_id=f"get_document_{document_id}",
operation_type=OperationType.READ,
resource_type="Document",
resource_id=document_id
)
try:
conn = self.connection_manager.get_database_connection()
cursor = conn.execute("""
SELECT id, filename, content, ast_json, content_hash, file_size, created_at, updated_at
FROM documents
WHERE id = ?
""", (document_id,))
row = cursor.fetchone()
if not row:
raise ResourceNotFoundError("Document", document_id, context)
# Parse the row data
return {
"id": row[0],
"filename": row[1],
"content": row[2],
"ast": json.loads(row[3]),
"content_hash": row[4],
"file_size": row[5],
"created_at": row[6],
"updated_at": row[7]
}
except ResourceNotFoundError:
# Re-raise ResourceNotFoundError as-is
raise
except json.JSONDecodeError as e:
logger.error(f"Failed to parse AST JSON for document {document_id}: {e}")
raise QueryError(
f"SELECT * FROM documents WHERE id = '{document_id}'",
{"document_id": document_id},
e,
context
)
except Exception as e:
logger.error(f"Error retrieving document {document_id}: {e}")
raise QueryError(
f"SELECT * FROM documents WHERE id = '{document_id}'",
{"document_id": document_id},
e,
context
)
async def get_documents(
self,
filename_pattern: Optional[str] = None,
limit: int = 100,
offset: int = 0,
context: Optional[ErrorContext] = None
) -> List[Dict[str, Any]]:
"""Retrieve multiple documents with filtering and pagination."""
if context is None:
context = ErrorContext(
operation_id=f"get_documents_{filename_pattern or 'all'}",
operation_type=OperationType.READ,
resource_type="Document",
metadata={
"filename_pattern": filename_pattern,
"limit": limit,
"offset": offset
}
)
try:
conn = self.connection_manager.get_database_connection()
# Build query based on filter
if filename_pattern:
query = """
SELECT id, filename, content, ast_json, content_hash, file_size, created_at, updated_at
FROM documents
WHERE filename LIKE ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?
"""
params = (f"%{filename_pattern}%", limit, offset)
else:
query = """
SELECT id, filename, content, ast_json, content_hash, file_size, created_at, updated_at
FROM documents
ORDER BY created_at DESC
LIMIT ? OFFSET ?
"""
params = (limit, offset)
cursor = conn.execute(query, params)
rows = cursor.fetchall()
documents = []
for row in rows:
try:
document = {
"id": row[0],
"filename": row[1],
"content": row[2],
"ast": json.loads(row[3]),
"content_hash": row[4],
"file_size": row[5],
"created_at": row[6],
"updated_at": row[7]
}
documents.append(document)
except json.JSONDecodeError as e:
logger.warning(f"Skipping document {row[0]} due to invalid AST JSON: {e}")
continue
return documents
except Exception as e:
logger.error(f"Error retrieving documents: {e}")
raise QueryError("SELECT documents with pagination", {"limit": limit, "offset": offset}, e, context)
async def update_document(
self,
document_id: str,
content: Optional[str] = None,
ast: Optional[Dict[str, Any]] = None,
context: Optional[ErrorContext] = None
) -> Dict[str, Any]:
"""Update an existing document."""
if context is None:
context = ErrorContext(
operation_id=f"update_document_{document_id}",
operation_type=OperationType.UPDATE,
resource_type="Document",
resource_id=document_id,
request_data={
"content_length": len(content) if content else None,
"ast_keys": list(ast.keys()) if ast else None
}
)
try:
async with self.connection_manager.transaction() as conn:
# Check if document exists
cursor = conn.execute("SELECT id FROM documents WHERE id = ?", (document_id,))
if not cursor.fetchone():
raise ResourceNotFoundError("Document", document_id, context)
# Build update query
updates = []
params = []
if content is not None:
# Recalculate content hash
import hashlib
content_hash = hashlib.sha256(content.encode()).hexdigest()
file_size = len(content)
updates.extend(["content = ?", "content_hash = ?", "file_size = ?"])
params.extend([content, content_hash, file_size])
if ast is not None:
ast_json = json.dumps(ast)
updates.append("ast_json = ?")
params.append(ast_json)
if not updates:
# No changes to make
return await self.get_document(document_id, context)
# Add updated timestamp
updates.append("updated_at = ?")
params.append(datetime.now(timezone.utc).isoformat())
# Add document_id for WHERE clause
params.append(document_id)
query = f"UPDATE documents SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, params)
logger.info(f"Updated document {document_id}")
# Return updated document
return await self.get_document(document_id, context)
except Exception as e:
logger.error(f"Error updating document {document_id}: {e}")
raise TransactionError(f"update document {document_id}", e, context)
async def delete_document(
self,
document_id: str,
context: Optional[ErrorContext] = None
) -> bool:
"""Delete a document."""
if context is None:
context = ErrorContext(
operation_id=f"delete_document_{document_id}",
operation_type=OperationType.DELETE,
resource_type="Document",
resource_id=document_id
)
try:
async with self.connection_manager.transaction() as conn:
# Check if document exists
cursor = conn.execute("SELECT id FROM documents WHERE id = ?", (document_id,))
if not cursor.fetchone():
raise ResourceNotFoundError("Document", document_id, context)
# Delete associated cache entries first (due to foreign key)
conn.execute("DELETE FROM ast_cache WHERE document_id = ?", (document_id,))
# Delete document
cursor = conn.execute("DELETE FROM documents WHERE id = ?", (document_id,))
deleted = cursor.rowcount > 0
if deleted:
logger.info(f"Deleted document {document_id}")
return deleted
except Exception as e:
logger.error(f"Error deleting document {document_id}: {e}")
raise TransactionError(f"delete document {document_id}", e, context)
async def get_cache_path(
self,
document_id: str,
context: Optional[ErrorContext] = None
) -> Path:
"""Get the cache file path for a document."""
if context is None:
context = ErrorContext(
operation_id=f"get_cache_path_{document_id}",
operation_type=OperationType.READ,
resource_type="CachePath",
resource_id=document_id
)
try:
conn = self.connection_manager.get_database_connection()
cursor = conn.execute("""
SELECT cache_path FROM ast_cache WHERE document_id = ?
""", (document_id,))
row = cursor.fetchone()
if not row:
raise ResourceNotFoundError("Cache", document_id, context)
return Path(row[0])
except Exception as e:
logger.error(f"Error getting cache path for document {document_id}: {e}")
raise QueryError(
f"SELECT cache_path FROM ast_cache WHERE document_id = '{document_id}'",
{"document_id": document_id},
e,
context
)
class SqliteCacheRepository(CacheRepository):
"""
SQLite implementation of CacheRepository.
Provides efficient caching operations using SQLite as storage backend.
"""
def __init__(self, connection_manager: ConnectionManager):
self.connection_manager = connection_manager
self._initialize_cache_schema()
def _initialize_cache_schema(self):
"""Initialize database schema for cache operations."""
try:
conn = self.connection_manager.get_database_connection()
# Create cache entries table
conn.execute("""
CREATE TABLE IF NOT EXISTS cache_entries (
key TEXT PRIMARY KEY,
value_json TEXT NOT NULL,
ttl_expires_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create index for TTL cleanup
conn.execute("CREATE INDEX IF NOT EXISTS idx_cache_ttl ON cache_entries(ttl_expires_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_cache_accessed ON cache_entries(accessed_at)")
conn.commit()
logger.info("Cache schema initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize cache schema: {e}")
raise ConnectionError("markitect.db", e)
async def get(
self,
key: str,
context: Optional[ErrorContext] = None
) -> Optional[Any]:
"""Retrieve a value from cache."""
if context is None:
context = ErrorContext(
operation_id=f"cache_get_{key}",
operation_type=OperationType.READ,
resource_type="Cache",
resource_id=key
)
try:
conn = self.connection_manager.get_database_connection()
# Clean up expired entries first
await self._cleanup_expired_entries(conn)
cursor = conn.execute("""
SELECT value_json FROM cache_entries
WHERE key = ? AND (ttl_expires_at IS NULL OR ttl_expires_at > CURRENT_TIMESTAMP)
""", (key,))
row = cursor.fetchone()
if row:
# Update access time
conn.execute("""
UPDATE cache_entries SET accessed_at = CURRENT_TIMESTAMP WHERE key = ?
""", (key,))
conn.commit()
return json.loads(row[0])
return None
except json.JSONDecodeError as e:
logger.error(f"Failed to parse cached value for key {key}: {e}")
# Remove corrupted cache entry
conn.execute("DELETE FROM cache_entries WHERE key = ?", (key,))
conn.commit()
return None
except Exception as e:
logger.error(f"Error getting cache value for key {key}: {e}")
return None
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None,
context: Optional[ErrorContext] = None
) -> bool:
"""Store a value in cache."""
if context is None:
context = ErrorContext(
operation_id=f"cache_set_{key}",
operation_type=OperationType.WRITE,
resource_type="Cache",
resource_id=key,
request_data={"ttl": ttl}
)
try:
conn = self.connection_manager.get_database_connection()
# Calculate expiration time
expires_at = None
if ttl:
from datetime import timedelta
expires_at = (datetime.now(timezone.utc) + timedelta(seconds=ttl)).isoformat()
# Serialize value
value_json = json.dumps(value)
# Upsert cache entry
conn.execute("""
INSERT OR REPLACE INTO cache_entries (key, value_json, ttl_expires_at, created_at, accessed_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""", (key, value_json, expires_at))
conn.commit()
return True
except Exception as e:
logger.error(f"Error setting cache value for key {key}: {e}")
return False
async def delete(
self,
key: str,
context: Optional[ErrorContext] = None
) -> bool:
"""Delete a value from cache."""
if context is None:
context = ErrorContext(
operation_id=f"cache_delete_{key}",
operation_type=OperationType.DELETE,
resource_type="Cache",
resource_id=key
)
try:
conn = self.connection_manager.get_database_connection()
cursor = conn.execute("DELETE FROM cache_entries WHERE key = ?", (key,))
conn.commit()
return cursor.rowcount > 0
except Exception as e:
logger.error(f"Error deleting cache value for key {key}: {e}")
return False
async def invalidate_pattern(
self,
pattern: str,
context: Optional[ErrorContext] = None
) -> int:
"""Invalidate cache entries matching a pattern."""
if context is None:
context = ErrorContext(
operation_id=f"cache_invalidate_{pattern}",
operation_type=OperationType.DELETE,
resource_type="Cache",
metadata={"pattern": pattern}
)
try:
conn = self.connection_manager.get_database_connection()
# Convert pattern to SQL LIKE pattern
sql_pattern = pattern.replace("*", "%")
cursor = conn.execute("DELETE FROM cache_entries WHERE key LIKE ?", (sql_pattern,))
conn.commit()
deleted_count = cursor.rowcount
logger.info(f"Invalidated {deleted_count} cache entries matching pattern '{pattern}'")
return deleted_count
except Exception as e:
logger.error(f"Error invalidating cache pattern {pattern}: {e}")
raise QueryError(f"DELETE FROM cache_entries WHERE key LIKE '{pattern}'", {"pattern": pattern}, e, context)
async def store_ast_cache(
self,
document_id: str,
ast: Dict[str, Any],
context: Optional[ErrorContext] = None
) -> bool:
"""Store AST cache for a document."""
if context is None:
context = ErrorContext(
operation_id=f"store_ast_cache_{document_id}",
operation_type=OperationType.WRITE,
resource_type="ASTCache",
resource_id=document_id
)
try:
conn = self.connection_manager.get_database_connection()
# Generate cache file path
cache_id = str(uuid.uuid4())
cache_path = f".cache/ast/{document_id}/{cache_id}.json"
# Create cache directory
cache_dir = Path(cache_path).parent
cache_dir.mkdir(parents=True, exist_ok=True)
# Write AST to cache file
with open(cache_path, 'w') as f:
json.dump(ast, f, indent=2)
cache_size = Path(cache_path).stat().st_size
# Store cache metadata in database
conn.execute("""
INSERT OR REPLACE INTO ast_cache (id, document_id, cache_path, cache_size, created_at, accessed_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""", (cache_id, document_id, cache_path, cache_size))
conn.commit()
logger.info(f"Stored AST cache for document {document_id} at {cache_path}")
return True
except Exception as e:
logger.error(f"Error storing AST cache for document {document_id}: {e}")
return False
async def _cleanup_expired_entries(self, conn: sqlite3.Connection):
"""Clean up expired cache entries."""
try:
cursor = conn.execute("DELETE FROM cache_entries WHERE ttl_expires_at < CURRENT_TIMESTAMP")
deleted_count = cursor.rowcount
if deleted_count > 0:
logger.debug(f"Cleaned up {deleted_count} expired cache entries")
except Exception as e:
logger.warning(f"Error cleaning up expired cache entries: {e}")