Complete user profile management system with CRUD operations and CLI integration: ## 🎯 Core Features Delivered - **ProfileManager**: Complete CRUD operations with database integration - **JSON Schema validation**: Comprehensive profile data validation - **Multiple profile support**: Named profiles (personal, work, etc.) - **Default profile system**: Set and manage default profiles - **Profile inheritance**: Merge profiles with override capabilities - **Template integration**: Extract flattened variables for template filling ## 📋 Profile Schema & Data Model - **Structured data classes**: ProfileData, ContactInfo, Address, Organization - **JSON Schema validation**: Full validation with field descriptions - **Flexible structure**: Support for nested data and custom fields - **Timestamp management**: Automatic created_at/updated_at tracking ## 🖥️ CLI Integration Complete - **9 CLI Commands**: create, show, list, update, delete, set-default, export, import, variables - **Multiple formats**: JSON, YAML, and table output formats - **Interactive mode**: Guided profile creation and updates - **Export/Import**: Full profile portability with validation - **Template variables**: Extract flattened variables for template systems ## 📊 Implementation Stats - **ProfileManager**: 500+ lines with comprehensive functionality - **ProfileSchema**: 350+ lines with validation and data structures - **CLI Commands**: 450+ lines of professional command interface - **Test Coverage**: 66 tests (36 core + 30 CLI) with 100% pass rate ## 🚀 **Ready for Template Integration** Foundation complete for Issue #99 (Auto Fill Templates) with: - Template variable extraction from profiles - Default profile system for seamless integration - Profile merging for complex template scenarios - Professional CLI for user profile management 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
663 lines
21 KiB
Python
663 lines
21 KiB
Python
"""
|
|
User Profile Manager for MarkiTect.
|
|
|
|
This module provides comprehensive CRUD operations for user profiles including
|
|
database integration, validation, and profile inheritance functionality.
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List, Union
|
|
from pathlib import Path
|
|
from jsonschema import ValidationError
|
|
|
|
from .schema import ProfileSchema, ProfileData
|
|
from ..database import DatabaseManager
|
|
|
|
|
|
class ProfileNotFoundError(Exception):
|
|
"""Raised when a profile is not found."""
|
|
pass
|
|
|
|
|
|
class ProfileValidationError(Exception):
|
|
"""Raised when profile data fails validation."""
|
|
pass
|
|
|
|
|
|
class ProfileManager:
|
|
"""
|
|
Comprehensive user profile management system.
|
|
|
|
Handles CRUD operations, validation, and database integration for user profiles
|
|
with support for multiple named profiles and template integration.
|
|
"""
|
|
|
|
def __init__(self, db_path: Optional[str] = None):
|
|
"""
|
|
Initialize profile manager.
|
|
|
|
Args:
|
|
db_path: Path to SQLite database file. If None, uses default from config.
|
|
"""
|
|
if db_path:
|
|
self.db_path = db_path
|
|
else:
|
|
# Use default database path from MarkiTect
|
|
from ..config_manager import ConfigurationManager
|
|
config_manager = ConfigurationManager()
|
|
config = config_manager.get_current_config()
|
|
self.db_path = config.get('database_path', 'markitect.db')
|
|
|
|
self._ensure_database_initialized()
|
|
|
|
def _ensure_database_initialized(self) -> None:
|
|
"""Ensure database and user_profiles table exist."""
|
|
# Initialize main database if needed
|
|
if not os.path.exists(self.db_path):
|
|
db_manager = DatabaseManager(self.db_path)
|
|
db_manager.initialize_database()
|
|
|
|
# Create user_profiles table
|
|
self._create_profiles_table()
|
|
|
|
def _create_profiles_table(self) -> None:
|
|
"""Create user_profiles table if it doesn't exist."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS user_profiles (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE,
|
|
description TEXT,
|
|
data TEXT NOT NULL, -- JSON data
|
|
is_active BOOLEAN DEFAULT TRUE,
|
|
is_default BOOLEAN DEFAULT FALSE,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
# Create index for faster lookups
|
|
cursor.execute('''
|
|
CREATE INDEX IF NOT EXISTS idx_user_profiles_name
|
|
ON user_profiles(name)
|
|
''')
|
|
|
|
cursor.execute('''
|
|
CREATE INDEX IF NOT EXISTS idx_user_profiles_active
|
|
ON user_profiles(is_active)
|
|
''')
|
|
|
|
conn.commit()
|
|
|
|
except sqlite3.Error as e:
|
|
conn.rollback()
|
|
raise RuntimeError(f"Failed to create profiles table: {e}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def create_profile(self, name: str, data: Union[Dict[str, Any], ProfileData],
|
|
description: Optional[str] = None,
|
|
set_as_default: bool = False) -> int:
|
|
"""
|
|
Create a new user profile.
|
|
|
|
Args:
|
|
name: Unique profile name
|
|
data: Profile data (dict or ProfileData instance)
|
|
description: Optional profile description
|
|
set_as_default: Whether to set as default profile
|
|
|
|
Returns:
|
|
ID of created profile
|
|
|
|
Raises:
|
|
ProfileValidationError: If data is invalid
|
|
ValueError: If profile name already exists
|
|
"""
|
|
# Convert ProfileData to dict if needed
|
|
if isinstance(data, ProfileData):
|
|
profile_data = data.to_dict()
|
|
else:
|
|
profile_data = data.copy()
|
|
|
|
# Add timestamps
|
|
now = datetime.now().isoformat()
|
|
profile_data.setdefault('created_at', now)
|
|
profile_data['updated_at'] = now
|
|
|
|
# Validate data
|
|
try:
|
|
ProfileSchema.validate(profile_data)
|
|
except ValidationError as e:
|
|
raise ProfileValidationError(f"Profile data validation failed: {e.message}")
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Check if profile name already exists
|
|
cursor.execute('SELECT id FROM user_profiles WHERE name = ?', (name,))
|
|
if cursor.fetchone():
|
|
raise ValueError(f"Profile '{name}' already exists")
|
|
|
|
# If setting as default, unset any existing default
|
|
if set_as_default:
|
|
cursor.execute('''
|
|
UPDATE user_profiles SET is_default = FALSE
|
|
WHERE is_default = TRUE
|
|
''')
|
|
|
|
# Insert new profile
|
|
cursor.execute('''
|
|
INSERT INTO user_profiles
|
|
(name, description, data, is_default, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
''', (
|
|
name,
|
|
description,
|
|
json.dumps(profile_data, indent=2),
|
|
set_as_default,
|
|
now,
|
|
now
|
|
))
|
|
|
|
conn.commit()
|
|
profile_id = cursor.lastrowid
|
|
|
|
return profile_id
|
|
|
|
except sqlite3.IntegrityError as e:
|
|
conn.rollback()
|
|
if "UNIQUE constraint failed" in str(e):
|
|
raise ValueError(f"Profile '{name}' already exists")
|
|
raise RuntimeError(f"Database error: {e}")
|
|
|
|
except sqlite3.Error as e:
|
|
conn.rollback()
|
|
raise RuntimeError(f"Failed to create profile: {e}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_profile(self, name: str) -> ProfileData:
|
|
"""
|
|
Get profile by name.
|
|
|
|
Args:
|
|
name: Profile name
|
|
|
|
Returns:
|
|
ProfileData instance
|
|
|
|
Raises:
|
|
ProfileNotFoundError: If profile doesn't exist
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute('''
|
|
SELECT data FROM user_profiles
|
|
WHERE name = ? AND is_active = TRUE
|
|
''', (name,))
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
raise ProfileNotFoundError(f"Profile '{name}' not found")
|
|
|
|
profile_data = json.loads(row[0])
|
|
return ProfileData.from_dict(profile_data)
|
|
|
|
except sqlite3.Error as e:
|
|
raise RuntimeError(f"Failed to get profile: {e}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_profile_info(self, name: str) -> Dict[str, Any]:
|
|
"""
|
|
Get profile metadata without full data.
|
|
|
|
Args:
|
|
name: Profile name
|
|
|
|
Returns:
|
|
Dictionary with profile metadata
|
|
|
|
Raises:
|
|
ProfileNotFoundError: If profile doesn't exist
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute('''
|
|
SELECT id, name, description, is_active, is_default,
|
|
created_at, updated_at
|
|
FROM user_profiles
|
|
WHERE name = ? AND is_active = TRUE
|
|
''', (name,))
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
raise ProfileNotFoundError(f"Profile '{name}' not found")
|
|
|
|
return {
|
|
'id': row[0],
|
|
'name': row[1],
|
|
'description': row[2],
|
|
'is_active': bool(row[3]),
|
|
'is_default': bool(row[4]),
|
|
'created_at': row[5],
|
|
'updated_at': row[6]
|
|
}
|
|
|
|
except sqlite3.Error as e:
|
|
raise RuntimeError(f"Failed to get profile info: {e}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def update_profile(self, name: str, data: Union[Dict[str, Any], ProfileData],
|
|
description: Optional[str] = None) -> bool:
|
|
"""
|
|
Update existing profile.
|
|
|
|
Args:
|
|
name: Profile name
|
|
data: Updated profile data
|
|
description: Updated description (optional)
|
|
|
|
Returns:
|
|
True if updated successfully
|
|
|
|
Raises:
|
|
ProfileNotFoundError: If profile doesn't exist
|
|
ProfileValidationError: If data is invalid
|
|
"""
|
|
# Convert ProfileData to dict if needed
|
|
if isinstance(data, ProfileData):
|
|
profile_data = data.to_dict()
|
|
else:
|
|
profile_data = data.copy()
|
|
|
|
# Update timestamp
|
|
profile_data['updated_at'] = datetime.now().isoformat()
|
|
|
|
# Validate data
|
|
try:
|
|
ProfileSchema.validate(profile_data)
|
|
except ValidationError as e:
|
|
raise ProfileValidationError(f"Profile data validation failed: {e.message}")
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Check if profile exists
|
|
cursor.execute('SELECT id FROM user_profiles WHERE name = ? AND is_active = TRUE', (name,))
|
|
if not cursor.fetchone():
|
|
raise ProfileNotFoundError(f"Profile '{name}' not found")
|
|
|
|
# Update profile
|
|
update_fields = ['data = ?', 'updated_at = ?']
|
|
update_values = [json.dumps(profile_data, indent=2), profile_data['updated_at']]
|
|
|
|
if description is not None:
|
|
update_fields.append('description = ?')
|
|
update_values.append(description)
|
|
|
|
update_values.append(name) # For WHERE clause
|
|
|
|
cursor.execute(f'''
|
|
UPDATE user_profiles
|
|
SET {', '.join(update_fields)}
|
|
WHERE name = ? AND is_active = TRUE
|
|
''', update_values)
|
|
|
|
conn.commit()
|
|
return cursor.rowcount > 0
|
|
|
|
except sqlite3.Error as e:
|
|
conn.rollback()
|
|
raise RuntimeError(f"Failed to update profile: {e}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def delete_profile(self, name: str, hard_delete: bool = False) -> bool:
|
|
"""
|
|
Delete a profile.
|
|
|
|
Args:
|
|
name: Profile name
|
|
hard_delete: If True, permanently delete. If False, mark as inactive.
|
|
|
|
Returns:
|
|
True if deleted successfully
|
|
|
|
Raises:
|
|
ProfileNotFoundError: If profile doesn't exist
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Check if profile exists
|
|
cursor.execute('SELECT id FROM user_profiles WHERE name = ?', (name,))
|
|
if not cursor.fetchone():
|
|
raise ProfileNotFoundError(f"Profile '{name}' not found")
|
|
|
|
if hard_delete:
|
|
# Permanently delete
|
|
cursor.execute('DELETE FROM user_profiles WHERE name = ?', (name,))
|
|
else:
|
|
# Mark as inactive
|
|
cursor.execute('''
|
|
UPDATE user_profiles
|
|
SET is_active = FALSE, updated_at = ?
|
|
WHERE name = ?
|
|
''', (datetime.now().isoformat(), name))
|
|
|
|
conn.commit()
|
|
return cursor.rowcount > 0
|
|
|
|
except sqlite3.Error as e:
|
|
conn.rollback()
|
|
raise RuntimeError(f"Failed to delete profile: {e}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def list_profiles(self, include_inactive: bool = False) -> List[Dict[str, Any]]:
|
|
"""
|
|
List all profiles.
|
|
|
|
Args:
|
|
include_inactive: Whether to include inactive profiles
|
|
|
|
Returns:
|
|
List of profile metadata dictionaries
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
if include_inactive:
|
|
cursor.execute('''
|
|
SELECT id, name, description, is_active, is_default,
|
|
created_at, updated_at
|
|
FROM user_profiles
|
|
ORDER BY is_default DESC, name ASC
|
|
''')
|
|
else:
|
|
cursor.execute('''
|
|
SELECT id, name, description, is_active, is_default,
|
|
created_at, updated_at
|
|
FROM user_profiles
|
|
WHERE is_active = TRUE
|
|
ORDER BY is_default DESC, name ASC
|
|
''')
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
return [
|
|
{
|
|
'id': row[0],
|
|
'name': row[1],
|
|
'description': row[2],
|
|
'is_active': bool(row[3]),
|
|
'is_default': bool(row[4]),
|
|
'created_at': row[5],
|
|
'updated_at': row[6]
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
except sqlite3.Error as e:
|
|
raise RuntimeError(f"Failed to list profiles: {e}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def set_default_profile(self, name: str) -> bool:
|
|
"""
|
|
Set a profile as the default profile.
|
|
|
|
Args:
|
|
name: Profile name
|
|
|
|
Returns:
|
|
True if set successfully
|
|
|
|
Raises:
|
|
ProfileNotFoundError: If profile doesn't exist
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Check if profile exists
|
|
cursor.execute('SELECT id FROM user_profiles WHERE name = ? AND is_active = TRUE', (name,))
|
|
if not cursor.fetchone():
|
|
raise ProfileNotFoundError(f"Profile '{name}' not found")
|
|
|
|
# Unset all defaults
|
|
cursor.execute('UPDATE user_profiles SET is_default = FALSE')
|
|
|
|
# Set new default
|
|
cursor.execute('''
|
|
UPDATE user_profiles
|
|
SET is_default = TRUE, updated_at = ?
|
|
WHERE name = ? AND is_active = TRUE
|
|
''', (datetime.now().isoformat(), name))
|
|
|
|
conn.commit()
|
|
return cursor.rowcount > 0
|
|
|
|
except sqlite3.Error as e:
|
|
conn.rollback()
|
|
raise RuntimeError(f"Failed to set default profile: {e}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_default_profile(self) -> Optional[ProfileData]:
|
|
"""
|
|
Get the default profile.
|
|
|
|
Returns:
|
|
Default ProfileData or None if no default set
|
|
"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute('''
|
|
SELECT name, data FROM user_profiles
|
|
WHERE is_default = TRUE AND is_active = TRUE
|
|
LIMIT 1
|
|
''')
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return None
|
|
|
|
profile_data = json.loads(row[1])
|
|
return ProfileData.from_dict(profile_data)
|
|
|
|
except sqlite3.Error as e:
|
|
raise RuntimeError(f"Failed to get default profile: {e}")
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def export_profile(self, name: str, format: str = 'json') -> str:
|
|
"""
|
|
Export profile to string format.
|
|
|
|
Args:
|
|
name: Profile name
|
|
format: Export format ('json' or 'yaml')
|
|
|
|
Returns:
|
|
Exported profile string
|
|
|
|
Raises:
|
|
ProfileNotFoundError: If profile doesn't exist
|
|
ValueError: If format is unsupported
|
|
"""
|
|
profile_data = self.get_profile(name)
|
|
profile_info = self.get_profile_info(name)
|
|
|
|
export_data = {
|
|
'profile_info': profile_info,
|
|
'profile_data': profile_data.to_dict()
|
|
}
|
|
|
|
if format.lower() == 'json':
|
|
return json.dumps(export_data, indent=2, ensure_ascii=False)
|
|
elif format.lower() == 'yaml':
|
|
try:
|
|
import yaml
|
|
return yaml.dump(export_data, default_flow_style=False, allow_unicode=True)
|
|
except ImportError:
|
|
raise ValueError("YAML format requires PyYAML package")
|
|
else:
|
|
raise ValueError(f"Unsupported export format: {format}")
|
|
|
|
def import_profile(self, name: str, data: str, format: str = 'json',
|
|
overwrite: bool = False) -> int:
|
|
"""
|
|
Import profile from string format.
|
|
|
|
Args:
|
|
name: New profile name (overrides name in data if different)
|
|
data: Profile data string
|
|
format: Import format ('json' or 'yaml')
|
|
overwrite: Whether to overwrite existing profile
|
|
|
|
Returns:
|
|
ID of imported profile
|
|
|
|
Raises:
|
|
ValueError: If format is unsupported or profile exists without overwrite
|
|
ProfileValidationError: If data is invalid
|
|
"""
|
|
if format.lower() == 'json':
|
|
import_data = json.loads(data)
|
|
elif format.lower() == 'yaml':
|
|
try:
|
|
import yaml
|
|
import_data = yaml.safe_load(data)
|
|
except ImportError:
|
|
raise ValueError("YAML format requires PyYAML package")
|
|
else:
|
|
raise ValueError(f"Unsupported import format: {format}")
|
|
|
|
# Extract profile data
|
|
if 'profile_data' in import_data:
|
|
profile_data = import_data['profile_data']
|
|
else:
|
|
profile_data = import_data
|
|
|
|
description = None
|
|
if 'profile_info' in import_data and 'description' in import_data['profile_info']:
|
|
description = import_data['profile_info']['description']
|
|
|
|
# Check if profile exists
|
|
try:
|
|
existing_profile = self.get_profile_info(name)
|
|
if not overwrite:
|
|
raise ValueError(f"Profile '{name}' already exists. Use overwrite=True to replace.")
|
|
# Update existing profile
|
|
self.update_profile(name, profile_data, description)
|
|
return existing_profile['id']
|
|
except ProfileNotFoundError:
|
|
# Create new profile
|
|
return self.create_profile(name, profile_data, description)
|
|
|
|
def merge_profiles(self, base_profile: str, override_profile: str) -> ProfileData:
|
|
"""
|
|
Merge two profiles with override taking precedence.
|
|
|
|
Args:
|
|
base_profile: Base profile name
|
|
override_profile: Override profile name
|
|
|
|
Returns:
|
|
Merged ProfileData
|
|
|
|
Raises:
|
|
ProfileNotFoundError: If either profile doesn't exist
|
|
"""
|
|
base_data = self.get_profile(base_profile).to_dict()
|
|
override_data = self.get_profile(override_profile).to_dict()
|
|
|
|
def deep_merge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Deep merge two dictionaries."""
|
|
result = base.copy()
|
|
for key, value in override.items():
|
|
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
|
|
result[key] = deep_merge(result[key], value)
|
|
elif value is not None: # Only override with non-None values
|
|
result[key] = value
|
|
return result
|
|
|
|
merged_data = deep_merge(base_data, override_data)
|
|
return ProfileData.from_dict(merged_data)
|
|
|
|
def get_template_variables(self, profile_name: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Get flattened template variables from a profile.
|
|
|
|
Args:
|
|
profile_name: Profile name (uses default if None)
|
|
|
|
Returns:
|
|
Flattened dictionary suitable for template filling
|
|
|
|
Raises:
|
|
ProfileNotFoundError: If profile doesn't exist
|
|
"""
|
|
if profile_name:
|
|
profile_data = self.get_profile(profile_name)
|
|
else:
|
|
profile_data = self.get_default_profile()
|
|
if not profile_data:
|
|
return {}
|
|
|
|
# Flatten nested structure for template variables
|
|
variables = {}
|
|
profile_dict = profile_data.to_dict()
|
|
|
|
def flatten_dict(d: Dict[str, Any], prefix: str = '') -> None:
|
|
"""Recursively flatten dictionary."""
|
|
for key, value in d.items():
|
|
if value is None:
|
|
continue
|
|
|
|
new_key = f"{prefix}.{key}" if prefix else key
|
|
|
|
if isinstance(value, dict):
|
|
flatten_dict(value, new_key)
|
|
elif isinstance(value, (str, int, float, bool)):
|
|
variables[new_key] = value
|
|
elif isinstance(value, list):
|
|
# Convert lists to comma-separated strings
|
|
variables[new_key] = ', '.join(str(item) for item in value)
|
|
|
|
flatten_dict(profile_dict)
|
|
|
|
# Add some computed convenience variables
|
|
if 'first_name' in variables and 'last_name' in variables:
|
|
variables['full_name'] = f"{variables['first_name']} {variables['last_name']}"
|
|
|
|
return variables |