diff --git a/markitect/spaces/__init__.py b/markitect/spaces/__init__.py index a66f6eaf..d53b5b35 100644 --- a/markitect/spaces/__init__.py +++ b/markitect/spaces/__init__.py @@ -60,6 +60,29 @@ from .events import ( reset_event_bus, ) +# Phase 7: Composability +from .composability import ( + # Models + SpaceReference, + SpaceReferenceType, + SpacePermission, + SpaceRole, + AccessLevel, + SpaceAccess, + InheritedVariable, + InheritedConfig, + # Services + ComposableSpaceService, + InheritanceResolver, + AccessControlService, + # Repositories + ISpaceReferenceRepository, + IAccessControlRepository, + SqliteSpaceReferenceRepository, + SqliteAccessControlRepository, +) +from .composability.service import CircularReferenceError + __all__ = [ # Models "InformationSpace", @@ -88,4 +111,23 @@ __all__ = [ "EventBus", "get_event_bus", "reset_event_bus", + # Composability - Models + "SpaceReference", + "SpaceReferenceType", + "SpacePermission", + "SpaceRole", + "AccessLevel", + "SpaceAccess", + "InheritedVariable", + "InheritedConfig", + # Composability - Services + "ComposableSpaceService", + "InheritanceResolver", + "AccessControlService", + "CircularReferenceError", + # Composability - Repositories + "ISpaceReferenceRepository", + "IAccessControlRepository", + "SqliteSpaceReferenceRepository", + "SqliteAccessControlRepository", ] diff --git a/markitect/spaces/composability/__init__.py b/markitect/spaces/composability/__init__.py new file mode 100644 index 00000000..2517a495 --- /dev/null +++ b/markitect/spaces/composability/__init__.py @@ -0,0 +1,49 @@ +""" +Composability module for Information Spaces. + +Provides space-to-space references, variable/config inheritance, +and basic access control for the Information Space system. +""" + +from .models import ( + SpaceReference, + SpaceReferenceType, + SpacePermission, + SpaceRole, + AccessLevel, + SpaceAccess, + InheritedVariable, + InheritedConfig, +) +from .service import ( + ComposableSpaceService, + InheritanceResolver, + AccessControlService, +) +from .repository import ( + ISpaceReferenceRepository, + IAccessControlRepository, + SqliteSpaceReferenceRepository, + SqliteAccessControlRepository, +) + +__all__ = [ + # Models + "SpaceReference", + "SpaceReferenceType", + "SpacePermission", + "SpaceRole", + "AccessLevel", + "SpaceAccess", + "InheritedVariable", + "InheritedConfig", + # Services + "ComposableSpaceService", + "InheritanceResolver", + "AccessControlService", + # Repositories + "ISpaceReferenceRepository", + "IAccessControlRepository", + "SqliteSpaceReferenceRepository", + "SqliteAccessControlRepository", +] diff --git a/markitect/spaces/composability/models.py b/markitect/spaces/composability/models.py new file mode 100644 index 00000000..6e07611f --- /dev/null +++ b/markitect/spaces/composability/models.py @@ -0,0 +1,271 @@ +""" +Domain models for space composability. + +Defines models for space references, inheritance, and access control. +""" + +import uuid +from dataclasses import dataclass, field +from datetime import datetime +from typing import Dict, Any, List, Optional +from enum import Enum + + +class SpaceReferenceType(Enum): + """Type of reference between spaces.""" + INCLUDES = "includes" # Space A includes content from Space B + EXTENDS = "extends" # Space A extends/inherits from Space B + LINKS_TO = "links_to" # Space A has a soft link to Space B + COMPOSED_OF = "composed_of" # Space A is composed of multiple spaces + + +@dataclass +class SpaceReference: + """ + Represents a reference between two spaces. + + Unlike parent_space_id (single inheritance), space references allow + multiple relationships between spaces for composition. + + Attributes: + id: Unique reference identifier + source_space_id: The space making the reference + target_space_id: The space being referenced + reference_type: Type of reference relationship + alias: Optional alias for the referenced space + metadata: Additional reference metadata + created_at: When the reference was created + """ + id: str = field(default_factory=lambda: str(uuid.uuid4())) + source_space_id: str = "" + target_space_id: str = "" + reference_type: SpaceReferenceType = SpaceReferenceType.LINKS_TO + alias: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict) + created_at: datetime = field(default_factory=datetime.now) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "id": self.id, + "source_space_id": self.source_space_id, + "target_space_id": self.target_space_id, + "reference_type": self.reference_type.value, + "alias": self.alias, + "metadata": self.metadata, + "created_at": self.created_at.isoformat(), + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "SpaceReference": + """Create from dictionary.""" + created_at = data.get("created_at") + if isinstance(created_at, str): + created_at = datetime.fromisoformat(created_at) + elif created_at is None: + created_at = datetime.now() + + ref_type = data.get("reference_type", "links_to") + if isinstance(ref_type, str): + ref_type = SpaceReferenceType(ref_type) + + return cls( + id=data.get("id", str(uuid.uuid4())), + source_space_id=data.get("source_space_id", ""), + target_space_id=data.get("target_space_id", ""), + reference_type=ref_type, + alias=data.get("alias"), + metadata=data.get("metadata", {}), + created_at=created_at, + ) + + +class AccessLevel(Enum): + """Access level for space permissions.""" + NONE = "none" # No access + READ = "read" # Read-only access + WRITE = "write" # Read and write access + ADMIN = "admin" # Full administrative access + + +class SpaceRole(Enum): + """Predefined roles for space access.""" + OWNER = "owner" # Full control, can delete space + ADMIN = "admin" # Can manage members and settings + EDITOR = "editor" # Can edit documents and variables + VIEWER = "viewer" # Read-only access + GUEST = "guest" # Limited read access + + +@dataclass +class SpacePermission: + """ + Permission entry for a space. + + Maps a principal (user/group/role) to an access level for a space. + + Attributes: + space_id: The space this permission applies to + principal_type: Type of principal (user, group, role) + principal_id: Identifier for the principal + access_level: The access level granted + role: Optional predefined role + granted_by: Who granted this permission + granted_at: When permission was granted + expires_at: Optional expiration timestamp + """ + space_id: str + principal_type: str # "user", "group", "role" + principal_id: str + access_level: AccessLevel = AccessLevel.READ + role: Optional[SpaceRole] = None + granted_by: Optional[str] = None + granted_at: datetime = field(default_factory=datetime.now) + expires_at: Optional[datetime] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return { + "space_id": self.space_id, + "principal_type": self.principal_type, + "principal_id": self.principal_id, + "access_level": self.access_level.value, + "role": self.role.value if self.role else None, + "granted_by": self.granted_by, + "granted_at": self.granted_at.isoformat(), + "expires_at": self.expires_at.isoformat() if self.expires_at else None, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "SpacePermission": + """Create from dictionary.""" + granted_at = data.get("granted_at") + if isinstance(granted_at, str): + granted_at = datetime.fromisoformat(granted_at) + elif granted_at is None: + granted_at = datetime.now() + + expires_at = data.get("expires_at") + if isinstance(expires_at, str): + expires_at = datetime.fromisoformat(expires_at) + + access_level = data.get("access_level", "read") + if isinstance(access_level, str): + access_level = AccessLevel(access_level) + + role = data.get("role") + if isinstance(role, str): + role = SpaceRole(role) + + return cls( + space_id=data["space_id"], + principal_type=data.get("principal_type", "user"), + principal_id=data["principal_id"], + access_level=access_level, + role=role, + granted_by=data.get("granted_by"), + granted_at=granted_at, + expires_at=expires_at, + ) + + def is_expired(self) -> bool: + """Check if permission has expired.""" + if self.expires_at is None: + return False + return datetime.now() > self.expires_at + + def has_access(self, required_level: AccessLevel) -> bool: + """Check if this permission grants the required access level.""" + if self.is_expired(): + return False + + level_order = [AccessLevel.NONE, AccessLevel.READ, AccessLevel.WRITE, AccessLevel.ADMIN] + return level_order.index(self.access_level) >= level_order.index(required_level) + + +@dataclass +class SpaceAccess: + """ + Computed access result for a principal on a space. + + Combines all applicable permissions to determine effective access. + + Attributes: + space_id: The space being accessed + principal_id: The principal accessing + effective_level: Computed effective access level + roles: All roles that apply + inherited_from: Space IDs from which access was inherited + permissions: Source permissions that contributed + """ + space_id: str + principal_id: str + effective_level: AccessLevel = AccessLevel.NONE + roles: List[SpaceRole] = field(default_factory=list) + inherited_from: List[str] = field(default_factory=list) + permissions: List[SpacePermission] = field(default_factory=list) + + def can_read(self) -> bool: + """Check if principal can read.""" + return self.effective_level in [AccessLevel.READ, AccessLevel.WRITE, AccessLevel.ADMIN] + + def can_write(self) -> bool: + """Check if principal can write.""" + return self.effective_level in [AccessLevel.WRITE, AccessLevel.ADMIN] + + def is_admin(self) -> bool: + """Check if principal has admin access.""" + return self.effective_level == AccessLevel.ADMIN + + +@dataclass +class InheritedVariable: + """ + A variable with its inheritance source. + + Attributes: + name: Variable name + value: Variable value + source_space_id: Space where this variable is defined + inheritance_depth: How many levels up the inheritance chain (0 = local) + scope: Variable scope + """ + name: str + value: Any + source_space_id: str + inheritance_depth: int = 0 + scope: str = "space" + + def is_local(self) -> bool: + """Check if variable is locally defined (not inherited).""" + return self.inheritance_depth == 0 + + +@dataclass +class InheritedConfig: + """ + Configuration with inheritance tracking. + + Attributes: + default_variant: Effective default variant + enable_caching: Effective caching setting + theme: Effective theme + history_enabled: Effective history setting + variable_scope: Effective variable scope + source_spaces: Map of config key to source space ID + """ + default_variant: str = "hierarchical" + enable_caching: bool = True + theme: Optional[str] = None + history_enabled: bool = False + variable_scope: str = "space" + source_spaces: Dict[str, str] = field(default_factory=dict) + + def get_source(self, key: str) -> Optional[str]: + """Get the source space ID for a config key.""" + return self.source_spaces.get(key) + + def is_inherited(self, key: str, current_space_id: str) -> bool: + """Check if a config key is inherited from a parent.""" + source = self.source_spaces.get(key) + return source is not None and source != current_space_id diff --git a/markitect/spaces/composability/repository.py b/markitect/spaces/composability/repository.py new file mode 100644 index 00000000..b874615d --- /dev/null +++ b/markitect/spaces/composability/repository.py @@ -0,0 +1,424 @@ +""" +Repository implementations for space composability. + +Provides storage for space references and access control. +""" + +import json +import sqlite3 +from abc import ABC, abstractmethod +from pathlib import Path +from typing import List, Optional +from datetime import datetime + +from .models import ( + SpaceReference, + SpaceReferenceType, + SpacePermission, + AccessLevel, + SpaceRole, +) + + +class ISpaceReferenceRepository(ABC): + """Interface for space reference storage.""" + + @abstractmethod + def add_reference(self, reference: SpaceReference) -> SpaceReference: + """Add a space reference.""" + pass + + @abstractmethod + def get_reference(self, reference_id: str) -> Optional[SpaceReference]: + """Get a reference by ID.""" + pass + + @abstractmethod + def get_references_from( + self, source_space_id: str, ref_type: Optional[SpaceReferenceType] = None + ) -> List[SpaceReference]: + """Get all references from a source space.""" + pass + + @abstractmethod + def get_references_to( + self, target_space_id: str, ref_type: Optional[SpaceReferenceType] = None + ) -> List[SpaceReference]: + """Get all references to a target space.""" + pass + + @abstractmethod + def remove_reference(self, reference_id: str) -> bool: + """Remove a reference.""" + pass + + @abstractmethod + def remove_references_from(self, source_space_id: str) -> int: + """Remove all references from a source space.""" + pass + + @abstractmethod + def reference_exists( + self, source_space_id: str, target_space_id: str, ref_type: SpaceReferenceType + ) -> bool: + """Check if a specific reference exists.""" + pass + + +class IAccessControlRepository(ABC): + """Interface for access control storage.""" + + @abstractmethod + def grant_permission(self, permission: SpacePermission) -> SpacePermission: + """Grant a permission.""" + pass + + @abstractmethod + def revoke_permission( + self, space_id: str, principal_type: str, principal_id: str + ) -> bool: + """Revoke a permission.""" + pass + + @abstractmethod + def get_permissions_for_space(self, space_id: str) -> List[SpacePermission]: + """Get all permissions for a space.""" + pass + + @abstractmethod + def get_permissions_for_principal( + self, principal_type: str, principal_id: str + ) -> List[SpacePermission]: + """Get all permissions for a principal.""" + pass + + @abstractmethod + def get_permission( + self, space_id: str, principal_type: str, principal_id: str + ) -> Optional[SpacePermission]: + """Get a specific permission.""" + pass + + @abstractmethod + def clear_space_permissions(self, space_id: str) -> int: + """Clear all permissions for a space.""" + pass + + +class SqliteSpaceReferenceRepository(ISpaceReferenceRepository): + """SQLite implementation of space reference repository.""" + + def __init__(self, db_path: Path): + """Initialize the repository. + + Args: + db_path: Path to the SQLite database file + """ + self._db_path = db_path + self._init_tables() + + def _get_connection(self) -> sqlite3.Connection: + """Get a database connection.""" + conn = sqlite3.connect(str(self._db_path)) + conn.row_factory = sqlite3.Row + return conn + + def _init_tables(self) -> None: + """Initialize database tables.""" + with self._get_connection() as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS space_references ( + id TEXT PRIMARY KEY, + source_space_id TEXT NOT NULL, + target_space_id TEXT NOT NULL, + reference_type TEXT NOT NULL, + alias TEXT, + metadata TEXT, + created_at TEXT NOT NULL, + UNIQUE(source_space_id, target_space_id, reference_type) + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_space_ref_source + ON space_references(source_space_id) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_space_ref_target + ON space_references(target_space_id) + """) + conn.commit() + + def add_reference(self, reference: SpaceReference) -> SpaceReference: + """Add a space reference.""" + with self._get_connection() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO space_references + (id, source_space_id, target_space_id, reference_type, alias, metadata, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + reference.id, + reference.source_space_id, + reference.target_space_id, + reference.reference_type.value, + reference.alias, + json.dumps(reference.metadata), + reference.created_at.isoformat(), + ), + ) + conn.commit() + return reference + + def get_reference(self, reference_id: str) -> Optional[SpaceReference]: + """Get a reference by ID.""" + with self._get_connection() as conn: + row = conn.execute( + "SELECT * FROM space_references WHERE id = ?", (reference_id,) + ).fetchone() + if row: + return self._row_to_reference(row) + return None + + def get_references_from( + self, source_space_id: str, ref_type: Optional[SpaceReferenceType] = None + ) -> List[SpaceReference]: + """Get all references from a source space.""" + with self._get_connection() as conn: + if ref_type: + rows = conn.execute( + """ + SELECT * FROM space_references + WHERE source_space_id = ? AND reference_type = ? + ORDER BY created_at + """, + (source_space_id, ref_type.value), + ).fetchall() + else: + rows = conn.execute( + """ + SELECT * FROM space_references + WHERE source_space_id = ? + ORDER BY created_at + """, + (source_space_id,), + ).fetchall() + return [self._row_to_reference(row) for row in rows] + + def get_references_to( + self, target_space_id: str, ref_type: Optional[SpaceReferenceType] = None + ) -> List[SpaceReference]: + """Get all references to a target space.""" + with self._get_connection() as conn: + if ref_type: + rows = conn.execute( + """ + SELECT * FROM space_references + WHERE target_space_id = ? AND reference_type = ? + ORDER BY created_at + """, + (target_space_id, ref_type.value), + ).fetchall() + else: + rows = conn.execute( + """ + SELECT * FROM space_references + WHERE target_space_id = ? + ORDER BY created_at + """, + (target_space_id,), + ).fetchall() + return [self._row_to_reference(row) for row in rows] + + def remove_reference(self, reference_id: str) -> bool: + """Remove a reference.""" + with self._get_connection() as conn: + cursor = conn.execute( + "DELETE FROM space_references WHERE id = ?", (reference_id,) + ) + conn.commit() + return cursor.rowcount > 0 + + def remove_references_from(self, source_space_id: str) -> int: + """Remove all references from a source space.""" + with self._get_connection() as conn: + cursor = conn.execute( + "DELETE FROM space_references WHERE source_space_id = ?", + (source_space_id,), + ) + conn.commit() + return cursor.rowcount + + def reference_exists( + self, source_space_id: str, target_space_id: str, ref_type: SpaceReferenceType + ) -> bool: + """Check if a specific reference exists.""" + with self._get_connection() as conn: + row = conn.execute( + """ + SELECT 1 FROM space_references + WHERE source_space_id = ? AND target_space_id = ? AND reference_type = ? + """, + (source_space_id, target_space_id, ref_type.value), + ).fetchone() + return row is not None + + def _row_to_reference(self, row: sqlite3.Row) -> SpaceReference: + """Convert a database row to a SpaceReference.""" + return SpaceReference( + id=row["id"], + source_space_id=row["source_space_id"], + target_space_id=row["target_space_id"], + reference_type=SpaceReferenceType(row["reference_type"]), + alias=row["alias"], + metadata=json.loads(row["metadata"]) if row["metadata"] else {}, + created_at=datetime.fromisoformat(row["created_at"]), + ) + + +class SqliteAccessControlRepository(IAccessControlRepository): + """SQLite implementation of access control repository.""" + + def __init__(self, db_path: Path): + """Initialize the repository. + + Args: + db_path: Path to the SQLite database file + """ + self._db_path = db_path + self._init_tables() + + def _get_connection(self) -> sqlite3.Connection: + """Get a database connection.""" + conn = sqlite3.connect(str(self._db_path)) + conn.row_factory = sqlite3.Row + return conn + + def _init_tables(self) -> None: + """Initialize database tables.""" + with self._get_connection() as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS space_permissions ( + space_id TEXT NOT NULL, + principal_type TEXT NOT NULL, + principal_id TEXT NOT NULL, + access_level TEXT NOT NULL, + role TEXT, + granted_by TEXT, + granted_at TEXT NOT NULL, + expires_at TEXT, + PRIMARY KEY(space_id, principal_type, principal_id) + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_perm_principal + ON space_permissions(principal_type, principal_id) + """) + conn.commit() + + def grant_permission(self, permission: SpacePermission) -> SpacePermission: + """Grant a permission.""" + with self._get_connection() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO space_permissions + (space_id, principal_type, principal_id, access_level, role, + granted_by, granted_at, expires_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + permission.space_id, + permission.principal_type, + permission.principal_id, + permission.access_level.value, + permission.role.value if permission.role else None, + permission.granted_by, + permission.granted_at.isoformat(), + permission.expires_at.isoformat() if permission.expires_at else None, + ), + ) + conn.commit() + return permission + + def revoke_permission( + self, space_id: str, principal_type: str, principal_id: str + ) -> bool: + """Revoke a permission.""" + with self._get_connection() as conn: + cursor = conn.execute( + """ + DELETE FROM space_permissions + WHERE space_id = ? AND principal_type = ? AND principal_id = ? + """, + (space_id, principal_type, principal_id), + ) + conn.commit() + return cursor.rowcount > 0 + + def get_permissions_for_space(self, space_id: str) -> List[SpacePermission]: + """Get all permissions for a space.""" + with self._get_connection() as conn: + rows = conn.execute( + "SELECT * FROM space_permissions WHERE space_id = ?", (space_id,) + ).fetchall() + return [self._row_to_permission(row) for row in rows] + + def get_permissions_for_principal( + self, principal_type: str, principal_id: str + ) -> List[SpacePermission]: + """Get all permissions for a principal.""" + with self._get_connection() as conn: + rows = conn.execute( + """ + SELECT * FROM space_permissions + WHERE principal_type = ? AND principal_id = ? + """, + (principal_type, principal_id), + ).fetchall() + return [self._row_to_permission(row) for row in rows] + + def get_permission( + self, space_id: str, principal_type: str, principal_id: str + ) -> Optional[SpacePermission]: + """Get a specific permission.""" + with self._get_connection() as conn: + row = conn.execute( + """ + SELECT * FROM space_permissions + WHERE space_id = ? AND principal_type = ? AND principal_id = ? + """, + (space_id, principal_type, principal_id), + ).fetchone() + if row: + return self._row_to_permission(row) + return None + + def clear_space_permissions(self, space_id: str) -> int: + """Clear all permissions for a space.""" + with self._get_connection() as conn: + cursor = conn.execute( + "DELETE FROM space_permissions WHERE space_id = ?", (space_id,) + ) + conn.commit() + return cursor.rowcount + + def _row_to_permission(self, row: sqlite3.Row) -> SpacePermission: + """Convert a database row to a SpacePermission.""" + expires_at = row["expires_at"] + if expires_at: + expires_at = datetime.fromisoformat(expires_at) + + role = row["role"] + if role: + role = SpaceRole(role) + + return SpacePermission( + space_id=row["space_id"], + principal_type=row["principal_type"], + principal_id=row["principal_id"], + access_level=AccessLevel(row["access_level"]), + role=role, + granted_by=row["granted_by"], + granted_at=datetime.fromisoformat(row["granted_at"]), + expires_at=expires_at, + ) diff --git a/markitect/spaces/composability/service.py b/markitect/spaces/composability/service.py new file mode 100644 index 00000000..4b0330ed --- /dev/null +++ b/markitect/spaces/composability/service.py @@ -0,0 +1,825 @@ +""" +Service layer for space composability. + +Provides inheritance resolution, reference management, and access control. +""" + +from typing import List, Optional, Dict, Any, Set +from dataclasses import dataclass + +from ..models import InformationSpace, SpaceVariable, SpaceConfig +from ..repositories.interfaces import ISpaceRepository, IVariableRepository +from .models import ( + SpaceReference, + SpaceReferenceType, + SpacePermission, + AccessLevel, + SpaceRole, + SpaceAccess, + InheritedVariable, + InheritedConfig, +) +from .repository import ISpaceReferenceRepository, IAccessControlRepository + + +class CircularReferenceError(Exception): + """Raised when a circular reference is detected.""" + pass + + +class InheritanceResolver: + """ + Resolves inherited values through the space hierarchy. + + Handles variable inheritance, config inheritance, and reference traversal. + """ + + def __init__( + self, + space_repo: ISpaceRepository, + variable_repo: IVariableRepository, + reference_repo: Optional[ISpaceReferenceRepository] = None, + max_depth: int = 10, + ): + """ + Initialize the resolver. + + Args: + space_repo: Repository for space lookups + variable_repo: Repository for variable lookups + reference_repo: Optional repository for reference lookups + max_depth: Maximum inheritance depth to prevent infinite loops + """ + self._space_repo = space_repo + self._variable_repo = variable_repo + self._reference_repo = reference_repo + self._max_depth = max_depth + + def get_parent_chain( + self, space_id: str, visited: Optional[Set[str]] = None + ) -> List[InformationSpace]: + """ + Get the chain of parent spaces. + + Args: + space_id: Starting space ID + visited: Set of already-visited space IDs (for cycle detection) + + Returns: + List of parent spaces, from immediate parent to root + + Raises: + CircularReferenceError: If a cycle is detected + """ + if visited is None: + visited = set() + + if space_id in visited: + raise CircularReferenceError( + f"Circular reference detected: space {space_id} already in chain" + ) + + visited.add(space_id) + space = self._space_repo.get_by_id(space_id) + + if not space or not space.parent_space_id: + return [] + + if len(visited) > self._max_depth: + raise CircularReferenceError( + f"Maximum inheritance depth ({self._max_depth}) exceeded" + ) + + parent = self._space_repo.get_by_id(space.parent_space_id) + if not parent: + return [] + + return [parent] + self.get_parent_chain(parent.id, visited) + + def resolve_variable( + self, space_id: str, name: str + ) -> Optional[InheritedVariable]: + """ + Resolve a variable, checking parent spaces if not found locally. + + Args: + space_id: The space to start resolution from + name: Variable name to resolve + + Returns: + InheritedVariable with value and source, or None if not found + """ + # Check local space first + local_var = self._variable_repo.get_variable(space_id, name) + if local_var: + return InheritedVariable( + name=name, + value=local_var.value, + source_space_id=space_id, + inheritance_depth=0, + scope=local_var.scope, + ) + + # Walk up parent chain + try: + parents = self.get_parent_chain(space_id) + except CircularReferenceError: + return None + + for depth, parent in enumerate(parents, start=1): + parent_var = self._variable_repo.get_variable(parent.id, name) + if parent_var: + return InheritedVariable( + name=name, + value=parent_var.value, + source_space_id=parent.id, + inheritance_depth=depth, + scope=parent_var.scope, + ) + + return None + + def resolve_all_variables( + self, space_id: str, scope: Optional[str] = None + ) -> Dict[str, InheritedVariable]: + """ + Resolve all variables for a space, including inherited ones. + + Variables defined locally override inherited ones with the same name. + + Args: + space_id: The space ID + scope: Optional scope filter + + Returns: + Dictionary of variable name to InheritedVariable + """ + result: Dict[str, InheritedVariable] = {} + + # Start from the furthest ancestor and work down + try: + parents = self.get_parent_chain(space_id) + except CircularReferenceError: + parents = [] + + # Build list from root to current: [grandparent, parent, current] + # parents is [parent, grandparent] so reverse it + current_space = self._space_repo.get_by_id(space_id) + all_spaces = list(reversed(parents)) + if current_space: + all_spaces.append(current_space) + + # Process from root to current so local values override inherited ones + # Index 0 is root (highest depth), last index is current (depth 0) + total = len(all_spaces) + for i, space in enumerate(all_spaces): + # depth_from_current: current=0, parent=1, grandparent=2, etc. + depth_from_current = total - 1 - i + variables = self._variable_repo.list_variables(space.id, scope) + for var in variables: + result[var.name] = InheritedVariable( + name=var.name, + value=var.value, + source_space_id=space.id, + inheritance_depth=depth_from_current, + scope=var.scope, + ) + + return result + + def resolve_config(self, space_id: str) -> InheritedConfig: + """ + Resolve the effective configuration for a space. + + Config values are inherited from parents unless overridden locally. + + Args: + space_id: The space ID + + Returns: + InheritedConfig with effective values and sources + """ + space = self._space_repo.get_by_id(space_id) + if not space: + return InheritedConfig() + + # Start with defaults + result = InheritedConfig( + default_variant="hierarchical", + enable_caching=True, + theme=None, + history_enabled=False, + variable_scope="space", + source_spaces={}, + ) + + # Get parent chain + try: + parents = self.get_parent_chain(space_id) + except CircularReferenceError: + parents = [] + + # Process from root to current (reversed order) + all_spaces = list(reversed(parents)) + [space] + + for s in all_spaces: + config = s.config + if config.default_variant: + result.default_variant = config.default_variant + result.source_spaces["default_variant"] = s.id + + if config.enable_caching is not None: + result.enable_caching = config.enable_caching + result.source_spaces["enable_caching"] = s.id + + if config.theme: + result.theme = config.theme + result.source_spaces["theme"] = s.id + + if config.history_enabled is not None: + result.history_enabled = config.history_enabled + result.source_spaces["history_enabled"] = s.id + + if config.variable_scope: + result.variable_scope = config.variable_scope + result.source_spaces["variable_scope"] = s.id + + return result + + +class AccessControlService: + """ + Service for managing space access control. + + Handles permission grants, checks, and inheritance. + """ + + def __init__( + self, + permission_repo: IAccessControlRepository, + space_repo: ISpaceRepository, + inheritance_resolver: Optional[InheritanceResolver] = None, + ): + """ + Initialize the service. + + Args: + permission_repo: Repository for permission storage + space_repo: Repository for space lookups + inheritance_resolver: Optional resolver for inherited permissions + """ + self._permission_repo = permission_repo + self._space_repo = space_repo + self._inheritance_resolver = inheritance_resolver + + def grant_permission( + self, + space_id: str, + principal_type: str, + principal_id: str, + access_level: AccessLevel, + role: Optional[SpaceRole] = None, + granted_by: Optional[str] = None, + ) -> SpacePermission: + """ + Grant a permission to a principal. + + Args: + space_id: The space ID + principal_type: Type of principal (user, group, role) + principal_id: Principal identifier + access_level: Access level to grant + role: Optional role assignment + granted_by: Who is granting the permission + + Returns: + The created permission + """ + permission = SpacePermission( + space_id=space_id, + principal_type=principal_type, + principal_id=principal_id, + access_level=access_level, + role=role, + granted_by=granted_by, + ) + return self._permission_repo.grant_permission(permission) + + def revoke_permission( + self, space_id: str, principal_type: str, principal_id: str + ) -> bool: + """ + Revoke a permission. + + Args: + space_id: The space ID + principal_type: Type of principal + principal_id: Principal identifier + + Returns: + True if revoked, False if not found + """ + return self._permission_repo.revoke_permission( + space_id, principal_type, principal_id + ) + + def check_access( + self, + space_id: str, + principal_type: str, + principal_id: str, + required_level: AccessLevel = AccessLevel.READ, + ) -> bool: + """ + Check if a principal has the required access level. + + Args: + space_id: The space ID + principal_type: Type of principal + principal_id: Principal identifier + required_level: Required access level + + Returns: + True if access is granted + """ + access = self.get_effective_access(space_id, principal_type, principal_id) + if access.effective_level == AccessLevel.NONE: + return False + # Compare using level order, not string values + level_order = [AccessLevel.NONE, AccessLevel.READ, AccessLevel.WRITE, AccessLevel.ADMIN] + return level_order.index(access.effective_level) >= level_order.index(required_level) + + def get_effective_access( + self, space_id: str, principal_type: str, principal_id: str + ) -> SpaceAccess: + """ + Get the effective access for a principal on a space. + + Considers direct permissions and inherited permissions from parent spaces. + + Args: + space_id: The space ID + principal_type: Type of principal + principal_id: Principal identifier + + Returns: + SpaceAccess with computed effective access + """ + result = SpaceAccess( + space_id=space_id, + principal_id=principal_id, + effective_level=AccessLevel.NONE, + roles=[], + inherited_from=[], + permissions=[], + ) + + # Check direct permission + direct = self._permission_repo.get_permission( + space_id, principal_type, principal_id + ) + if direct and not direct.is_expired(): + result.permissions.append(direct) + result.effective_level = direct.access_level + if direct.role: + result.roles.append(direct.role) + + # Check inherited permissions if we have a resolver + if self._inheritance_resolver: + try: + parents = self._inheritance_resolver.get_parent_chain(space_id) + except CircularReferenceError: + parents = [] + + for parent in parents: + parent_perm = self._permission_repo.get_permission( + parent.id, principal_type, principal_id + ) + if parent_perm and not parent_perm.is_expired(): + result.permissions.append(parent_perm) + result.inherited_from.append(parent.id) + + # Take the highest access level + level_order = [ + AccessLevel.NONE, + AccessLevel.READ, + AccessLevel.WRITE, + AccessLevel.ADMIN, + ] + if level_order.index(parent_perm.access_level) > level_order.index( + result.effective_level + ): + result.effective_level = parent_perm.access_level + + if parent_perm.role and parent_perm.role not in result.roles: + result.roles.append(parent_perm.role) + + return result + + def get_space_permissions(self, space_id: str) -> List[SpacePermission]: + """ + Get all permissions for a space. + + Args: + space_id: The space ID + + Returns: + List of permissions + """ + return self._permission_repo.get_permissions_for_space(space_id) + + def get_principal_permissions( + self, principal_type: str, principal_id: str + ) -> List[SpacePermission]: + """ + Get all permissions for a principal. + + Args: + principal_type: Type of principal + principal_id: Principal identifier + + Returns: + List of permissions + """ + return self._permission_repo.get_permissions_for_principal( + principal_type, principal_id + ) + + +class ComposableSpaceService: + """ + Extended space service with composability features. + + Provides space references, inheritance resolution, and access control + on top of the base SpaceService functionality. + """ + + def __init__( + self, + space_repo: ISpaceRepository, + variable_repo: IVariableRepository, + reference_repo: ISpaceReferenceRepository, + permission_repo: IAccessControlRepository, + max_inheritance_depth: int = 10, + ): + """ + Initialize the service. + + Args: + space_repo: Repository for spaces + variable_repo: Repository for variables + reference_repo: Repository for space references + permission_repo: Repository for permissions + max_inheritance_depth: Maximum inheritance chain depth + """ + self._space_repo = space_repo + self._variable_repo = variable_repo + self._reference_repo = reference_repo + self._permission_repo = permission_repo + + self._inheritance_resolver = InheritanceResolver( + space_repo=space_repo, + variable_repo=variable_repo, + reference_repo=reference_repo, + max_depth=max_inheritance_depth, + ) + + self._access_service = AccessControlService( + permission_repo=permission_repo, + space_repo=space_repo, + inheritance_resolver=self._inheritance_resolver, + ) + + # ========================================================================= + # Space References + # ========================================================================= + + def add_reference( + self, + source_space_id: str, + target_space_id: str, + reference_type: SpaceReferenceType = SpaceReferenceType.LINKS_TO, + alias: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SpaceReference: + """ + Add a reference from one space to another. + + Args: + source_space_id: The space making the reference + target_space_id: The space being referenced + reference_type: Type of reference + alias: Optional alias for the target space + metadata: Optional metadata + + Returns: + The created reference + + Raises: + ValueError: If spaces don't exist or reference would create a cycle + CircularReferenceError: If reference would create a circular dependency + """ + # Validate spaces exist + if not self._space_repo.exists(source_space_id): + raise ValueError(f"Source space '{source_space_id}' not found") + if not self._space_repo.exists(target_space_id): + raise ValueError(f"Target space '{target_space_id}' not found") + + # Check for self-reference + if source_space_id == target_space_id: + raise ValueError("Cannot reference self") + + # Check for circular reference (for EXTENDS type) + if reference_type == SpaceReferenceType.EXTENDS: + self._check_circular_reference(source_space_id, target_space_id) + + reference = SpaceReference( + source_space_id=source_space_id, + target_space_id=target_space_id, + reference_type=reference_type, + alias=alias, + metadata=metadata or {}, + ) + + return self._reference_repo.add_reference(reference) + + def _check_circular_reference( + self, source_id: str, target_id: str, visited: Optional[Set[str]] = None + ) -> None: + """Check if adding a reference would create a cycle.""" + if visited is None: + visited = {source_id} + + if target_id in visited: + raise CircularReferenceError( + f"Adding reference would create cycle: {target_id} already in chain" + ) + + visited.add(target_id) + + # Check existing references from target + refs = self._reference_repo.get_references_from( + target_id, SpaceReferenceType.EXTENDS + ) + for ref in refs: + self._check_circular_reference(source_id, ref.target_space_id, visited) + + # Also check parent_space_id chain + target_space = self._space_repo.get_by_id(target_id) + if target_space and target_space.parent_space_id: + if target_space.parent_space_id in visited: + raise CircularReferenceError( + f"Adding reference would create cycle through parent: " + f"{target_space.parent_space_id}" + ) + self._check_circular_reference( + source_id, target_space.parent_space_id, visited + ) + + def remove_reference(self, reference_id: str) -> bool: + """ + Remove a space reference. + + Args: + reference_id: The reference ID + + Returns: + True if removed, False if not found + """ + return self._reference_repo.remove_reference(reference_id) + + def get_references_from( + self, + space_id: str, + reference_type: Optional[SpaceReferenceType] = None, + ) -> List[SpaceReference]: + """ + Get all references from a space. + + Args: + space_id: The source space ID + reference_type: Optional filter by type + + Returns: + List of references + """ + return self._reference_repo.get_references_from(space_id, reference_type) + + def get_references_to( + self, + space_id: str, + reference_type: Optional[SpaceReferenceType] = None, + ) -> List[SpaceReference]: + """ + Get all references to a space. + + Args: + space_id: The target space ID + reference_type: Optional filter by type + + Returns: + List of references + """ + return self._reference_repo.get_references_to(space_id, reference_type) + + def get_composed_spaces(self, space_id: str) -> List[InformationSpace]: + """ + Get all spaces that compose this space. + + Includes parent, extended spaces, and included spaces. + + Args: + space_id: The space ID + + Returns: + List of composed spaces + """ + result: List[InformationSpace] = [] + seen: Set[str] = {space_id} + + space = self._space_repo.get_by_id(space_id) + if not space: + return [] + + # Add parent + if space.parent_space_id: + parent = self._space_repo.get_by_id(space.parent_space_id) + if parent and parent.id not in seen: + result.append(parent) + seen.add(parent.id) + + # Add extended spaces + extends_refs = self._reference_repo.get_references_from( + space_id, SpaceReferenceType.EXTENDS + ) + for ref in extends_refs: + if ref.target_space_id not in seen: + target = self._space_repo.get_by_id(ref.target_space_id) + if target: + result.append(target) + seen.add(ref.target_space_id) + + # Add included spaces + includes_refs = self._reference_repo.get_references_from( + space_id, SpaceReferenceType.INCLUDES + ) + for ref in includes_refs: + if ref.target_space_id not in seen: + target = self._space_repo.get_by_id(ref.target_space_id) + if target: + result.append(target) + seen.add(ref.target_space_id) + + return result + + # ========================================================================= + # Inheritance Resolution + # ========================================================================= + + def resolve_variable( + self, space_id: str, name: str + ) -> Optional[InheritedVariable]: + """ + Resolve a variable with inheritance. + + Args: + space_id: The space ID + name: Variable name + + Returns: + InheritedVariable or None if not found + """ + return self._inheritance_resolver.resolve_variable(space_id, name) + + def resolve_all_variables( + self, space_id: str, scope: Optional[str] = None + ) -> Dict[str, InheritedVariable]: + """ + Resolve all variables including inherited ones. + + Args: + space_id: The space ID + scope: Optional scope filter + + Returns: + Dictionary of variable name to InheritedVariable + """ + return self._inheritance_resolver.resolve_all_variables(space_id, scope) + + def resolve_config(self, space_id: str) -> InheritedConfig: + """ + Resolve effective configuration with inheritance. + + Args: + space_id: The space ID + + Returns: + InheritedConfig with effective values + """ + return self._inheritance_resolver.resolve_config(space_id) + + def get_parent_chain(self, space_id: str) -> List[InformationSpace]: + """ + Get the parent space chain. + + Args: + space_id: The space ID + + Returns: + List of parent spaces from immediate parent to root + """ + return self._inheritance_resolver.get_parent_chain(space_id) + + # ========================================================================= + # Access Control + # ========================================================================= + + def grant_access( + self, + space_id: str, + principal_type: str, + principal_id: str, + access_level: AccessLevel, + role: Optional[SpaceRole] = None, + granted_by: Optional[str] = None, + ) -> SpacePermission: + """ + Grant access to a space. + + Args: + space_id: The space ID + principal_type: Type of principal (user, group) + principal_id: Principal identifier + access_level: Access level to grant + role: Optional role + granted_by: Who is granting access + + Returns: + The created permission + """ + return self._access_service.grant_permission( + space_id, principal_type, principal_id, access_level, role, granted_by + ) + + def revoke_access( + self, space_id: str, principal_type: str, principal_id: str + ) -> bool: + """ + Revoke access to a space. + + Args: + space_id: The space ID + principal_type: Type of principal + principal_id: Principal identifier + + Returns: + True if revoked + """ + return self._access_service.revoke_permission( + space_id, principal_type, principal_id + ) + + def check_access( + self, + space_id: str, + principal_type: str, + principal_id: str, + required_level: AccessLevel = AccessLevel.READ, + ) -> bool: + """ + Check if a principal has required access. + + Args: + space_id: The space ID + principal_type: Type of principal + principal_id: Principal identifier + required_level: Required access level + + Returns: + True if access is granted + """ + return self._access_service.check_access( + space_id, principal_type, principal_id, required_level + ) + + def get_effective_access( + self, space_id: str, principal_type: str, principal_id: str + ) -> SpaceAccess: + """ + Get effective access including inheritance. + + Args: + space_id: The space ID + principal_type: Type of principal + principal_id: Principal identifier + + Returns: + SpaceAccess with computed access + """ + return self._access_service.get_effective_access( + space_id, principal_type, principal_id + ) + + def get_space_permissions(self, space_id: str) -> List[SpacePermission]: + """Get all permissions for a space.""" + return self._access_service.get_space_permissions(space_id) diff --git a/tests/unit/spaces/test_composability.py b/tests/unit/spaces/test_composability.py new file mode 100644 index 00000000..71d57ea5 --- /dev/null +++ b/tests/unit/spaces/test_composability.py @@ -0,0 +1,1099 @@ +""" +Tests for Phase 7: Space Composability. + +Tests space references, variable/config inheritance, and access control. +""" + +import pytest +import tempfile +from pathlib import Path +from datetime import datetime, timedelta + +from markitect.spaces import ( + # Models + InformationSpace, + SpaceConfig, + SpaceMetadata, + SpaceVariable, + SpaceStatus, + # Composability Models + SpaceReference, + SpaceReferenceType, + SpacePermission, + SpaceRole, + AccessLevel, + SpaceAccess, + InheritedVariable, + InheritedConfig, + # Services + ComposableSpaceService, + InheritanceResolver, + AccessControlService, + CircularReferenceError, + # Repositories + SqliteSpaceRepository, + SqliteVariableRepository, + SqliteSpaceReferenceRepository, + SqliteAccessControlRepository, +) + + +# =========================================================================== +# Fixtures +# =========================================================================== + + +@pytest.fixture +def db_path(): + """Create a temporary database path.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) / "test.db" + + +@pytest.fixture +def space_repo(db_path): + """Create a space repository.""" + return SqliteSpaceRepository(db_path) + + +@pytest.fixture +def variable_repo(db_path): + """Create a variable repository.""" + return SqliteVariableRepository(db_path) + + +@pytest.fixture +def reference_repo(db_path): + """Create a reference repository.""" + return SqliteSpaceReferenceRepository(db_path) + + +@pytest.fixture +def permission_repo(db_path): + """Create a permission repository.""" + return SqliteAccessControlRepository(db_path) + + +@pytest.fixture +def inheritance_resolver(space_repo, variable_repo, reference_repo): + """Create an inheritance resolver.""" + return InheritanceResolver( + space_repo=space_repo, + variable_repo=variable_repo, + reference_repo=reference_repo, + ) + + +@pytest.fixture +def access_service(permission_repo, space_repo, inheritance_resolver): + """Create an access control service.""" + return AccessControlService( + permission_repo=permission_repo, + space_repo=space_repo, + inheritance_resolver=inheritance_resolver, + ) + + +@pytest.fixture +def composable_service(space_repo, variable_repo, reference_repo, permission_repo): + """Create a composable space service.""" + return ComposableSpaceService( + space_repo=space_repo, + variable_repo=variable_repo, + reference_repo=reference_repo, + permission_repo=permission_repo, + ) + + +# =========================================================================== +# SpaceReference Model Tests +# =========================================================================== + + +class TestSpaceReferenceModel: + """Tests for SpaceReference model.""" + + def test_create_reference(self): + """Test creating a space reference.""" + ref = SpaceReference( + source_space_id="space-1", + target_space_id="space-2", + reference_type=SpaceReferenceType.INCLUDES, + ) + assert ref.source_space_id == "space-1" + assert ref.target_space_id == "space-2" + assert ref.reference_type == SpaceReferenceType.INCLUDES + assert ref.id is not None + + def test_reference_with_alias(self): + """Test reference with alias.""" + ref = SpaceReference( + source_space_id="space-1", + target_space_id="space-2", + reference_type=SpaceReferenceType.LINKS_TO, + alias="common-components", + ) + assert ref.alias == "common-components" + + def test_reference_to_dict(self): + """Test serialization to dict.""" + ref = SpaceReference( + source_space_id="space-1", + target_space_id="space-2", + reference_type=SpaceReferenceType.EXTENDS, + alias="base", + metadata={"priority": 1}, + ) + data = ref.to_dict() + assert data["source_space_id"] == "space-1" + assert data["target_space_id"] == "space-2" + assert data["reference_type"] == "extends" + assert data["alias"] == "base" + assert data["metadata"]["priority"] == 1 + + def test_reference_from_dict(self): + """Test deserialization from dict.""" + data = { + "id": "ref-1", + "source_space_id": "space-1", + "target_space_id": "space-2", + "reference_type": "composed_of", + "alias": "part-a", + "metadata": {"order": 1}, + "created_at": "2024-01-01T00:00:00", + } + ref = SpaceReference.from_dict(data) + assert ref.id == "ref-1" + assert ref.reference_type == SpaceReferenceType.COMPOSED_OF + assert ref.alias == "part-a" + + +# =========================================================================== +# SpacePermission Model Tests +# =========================================================================== + + +class TestSpacePermissionModel: + """Tests for SpacePermission model.""" + + def test_create_permission(self): + """Test creating a permission.""" + perm = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.WRITE, + ) + assert perm.space_id == "space-1" + assert perm.principal_id == "user-1" + assert perm.access_level == AccessLevel.WRITE + + def test_permission_with_role(self): + """Test permission with role.""" + perm = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.ADMIN, + role=SpaceRole.OWNER, + ) + assert perm.role == SpaceRole.OWNER + + def test_permission_expiration(self): + """Test permission expiration check.""" + # Not expired + perm = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.READ, + expires_at=datetime.now() + timedelta(days=1), + ) + assert not perm.is_expired() + + # Expired + perm_expired = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-2", + access_level=AccessLevel.READ, + expires_at=datetime.now() - timedelta(days=1), + ) + assert perm_expired.is_expired() + + def test_has_access(self): + """Test has_access check.""" + perm = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.WRITE, + ) + assert perm.has_access(AccessLevel.READ) + assert perm.has_access(AccessLevel.WRITE) + assert not perm.has_access(AccessLevel.ADMIN) + + def test_permission_to_dict(self): + """Test serialization.""" + perm = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.ADMIN, + role=SpaceRole.OWNER, + granted_by="admin", + ) + data = perm.to_dict() + assert data["access_level"] == "admin" + assert data["role"] == "owner" + assert data["granted_by"] == "admin" + + +# =========================================================================== +# SpaceAccess Model Tests +# =========================================================================== + + +class TestSpaceAccessModel: + """Tests for SpaceAccess model.""" + + def test_access_checks(self): + """Test access level checks.""" + access = SpaceAccess( + space_id="space-1", + principal_id="user-1", + effective_level=AccessLevel.WRITE, + ) + assert access.can_read() + assert access.can_write() + assert not access.is_admin() + + def test_admin_access(self): + """Test admin access.""" + access = SpaceAccess( + space_id="space-1", + principal_id="user-1", + effective_level=AccessLevel.ADMIN, + roles=[SpaceRole.OWNER], + ) + assert access.can_read() + assert access.can_write() + assert access.is_admin() + + def test_no_access(self): + """Test no access.""" + access = SpaceAccess( + space_id="space-1", + principal_id="user-1", + effective_level=AccessLevel.NONE, + ) + assert not access.can_read() + assert not access.can_write() + assert not access.is_admin() + + +# =========================================================================== +# InheritedVariable Tests +# =========================================================================== + + +class TestInheritedVariable: + """Tests for InheritedVariable.""" + + def test_local_variable(self): + """Test local variable detection.""" + var = InheritedVariable( + name="version", + value="1.0", + source_space_id="space-1", + inheritance_depth=0, + ) + assert var.is_local() + + def test_inherited_variable(self): + """Test inherited variable.""" + var = InheritedVariable( + name="company", + value="Acme Corp", + source_space_id="parent-space", + inheritance_depth=2, + ) + assert not var.is_local() + assert var.inheritance_depth == 2 + + +# =========================================================================== +# InheritedConfig Tests +# =========================================================================== + + +class TestInheritedConfig: + """Tests for InheritedConfig.""" + + def test_default_config(self): + """Test default config values.""" + config = InheritedConfig() + assert config.default_variant == "hierarchical" + assert config.enable_caching is True + assert config.theme is None + + def test_source_tracking(self): + """Test config source tracking.""" + config = InheritedConfig( + theme="dark", + source_spaces={"theme": "parent-space"}, + ) + assert config.get_source("theme") == "parent-space" + assert config.is_inherited("theme", "child-space") + assert not config.is_inherited("theme", "parent-space") + + +# =========================================================================== +# SpaceReferenceRepository Tests +# =========================================================================== + + +class TestSqliteSpaceReferenceRepository: + """Tests for SQLite space reference repository.""" + + def test_add_reference(self, reference_repo): + """Test adding a reference.""" + ref = SpaceReference( + source_space_id="space-1", + target_space_id="space-2", + reference_type=SpaceReferenceType.INCLUDES, + ) + saved = reference_repo.add_reference(ref) + assert saved.id == ref.id + + def test_get_reference(self, reference_repo): + """Test getting a reference by ID.""" + ref = SpaceReference( + source_space_id="space-1", + target_space_id="space-2", + reference_type=SpaceReferenceType.LINKS_TO, + ) + reference_repo.add_reference(ref) + + found = reference_repo.get_reference(ref.id) + assert found is not None + assert found.source_space_id == "space-1" + + def test_get_references_from(self, reference_repo): + """Test getting references from a source.""" + ref1 = SpaceReference( + source_space_id="space-1", + target_space_id="space-2", + reference_type=SpaceReferenceType.INCLUDES, + ) + ref2 = SpaceReference( + source_space_id="space-1", + target_space_id="space-3", + reference_type=SpaceReferenceType.LINKS_TO, + ) + reference_repo.add_reference(ref1) + reference_repo.add_reference(ref2) + + refs = reference_repo.get_references_from("space-1") + assert len(refs) == 2 + + # Filter by type + refs_includes = reference_repo.get_references_from( + "space-1", SpaceReferenceType.INCLUDES + ) + assert len(refs_includes) == 1 + assert refs_includes[0].target_space_id == "space-2" + + def test_get_references_to(self, reference_repo): + """Test getting references to a target.""" + ref1 = SpaceReference( + source_space_id="space-1", + target_space_id="space-3", + reference_type=SpaceReferenceType.INCLUDES, + ) + ref2 = SpaceReference( + source_space_id="space-2", + target_space_id="space-3", + reference_type=SpaceReferenceType.LINKS_TO, + ) + reference_repo.add_reference(ref1) + reference_repo.add_reference(ref2) + + refs = reference_repo.get_references_to("space-3") + assert len(refs) == 2 + + def test_remove_reference(self, reference_repo): + """Test removing a reference.""" + ref = SpaceReference( + source_space_id="space-1", + target_space_id="space-2", + reference_type=SpaceReferenceType.INCLUDES, + ) + reference_repo.add_reference(ref) + assert reference_repo.remove_reference(ref.id) + assert reference_repo.get_reference(ref.id) is None + + def test_reference_exists(self, reference_repo): + """Test checking if reference exists.""" + ref = SpaceReference( + source_space_id="space-1", + target_space_id="space-2", + reference_type=SpaceReferenceType.EXTENDS, + ) + reference_repo.add_reference(ref) + + assert reference_repo.reference_exists( + "space-1", "space-2", SpaceReferenceType.EXTENDS + ) + assert not reference_repo.reference_exists( + "space-1", "space-2", SpaceReferenceType.INCLUDES + ) + + +# =========================================================================== +# AccessControlRepository Tests +# =========================================================================== + + +class TestSqliteAccessControlRepository: + """Tests for SQLite access control repository.""" + + def test_grant_permission(self, permission_repo): + """Test granting a permission.""" + perm = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.WRITE, + ) + saved = permission_repo.grant_permission(perm) + assert saved.space_id == "space-1" + + def test_get_permission(self, permission_repo): + """Test getting a specific permission.""" + perm = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.READ, + ) + permission_repo.grant_permission(perm) + + found = permission_repo.get_permission("space-1", "user", "user-1") + assert found is not None + assert found.access_level == AccessLevel.READ + + def test_revoke_permission(self, permission_repo): + """Test revoking a permission.""" + perm = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.ADMIN, + ) + permission_repo.grant_permission(perm) + assert permission_repo.revoke_permission("space-1", "user", "user-1") + assert permission_repo.get_permission("space-1", "user", "user-1") is None + + def test_get_permissions_for_space(self, permission_repo): + """Test getting all permissions for a space.""" + perm1 = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.READ, + ) + perm2 = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-2", + access_level=AccessLevel.WRITE, + ) + permission_repo.grant_permission(perm1) + permission_repo.grant_permission(perm2) + + perms = permission_repo.get_permissions_for_space("space-1") + assert len(perms) == 2 + + def test_get_permissions_for_principal(self, permission_repo): + """Test getting all permissions for a principal.""" + perm1 = SpacePermission( + space_id="space-1", + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.READ, + ) + perm2 = SpacePermission( + space_id="space-2", + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.ADMIN, + ) + permission_repo.grant_permission(perm1) + permission_repo.grant_permission(perm2) + + perms = permission_repo.get_permissions_for_principal("user", "user-1") + assert len(perms) == 2 + + +# =========================================================================== +# InheritanceResolver Tests +# =========================================================================== + + +class TestInheritanceResolver: + """Tests for inheritance resolution.""" + + def test_get_parent_chain_no_parent(self, inheritance_resolver, space_repo): + """Test getting parent chain for space without parent.""" + space = InformationSpace(name="root-space") + space_repo.create(space) + + chain = inheritance_resolver.get_parent_chain(space.id) + assert chain == [] + + def test_get_parent_chain_single_parent(self, inheritance_resolver, space_repo): + """Test getting parent chain with one parent.""" + parent = InformationSpace(name="parent-space") + space_repo.create(parent) + + child = InformationSpace(name="child-space", parent_space_id=parent.id) + space_repo.create(child) + + chain = inheritance_resolver.get_parent_chain(child.id) + assert len(chain) == 1 + assert chain[0].id == parent.id + + def test_get_parent_chain_multiple_levels(self, inheritance_resolver, space_repo): + """Test getting parent chain with multiple levels.""" + grandparent = InformationSpace(name="grandparent") + space_repo.create(grandparent) + + parent = InformationSpace(name="parent", parent_space_id=grandparent.id) + space_repo.create(parent) + + child = InformationSpace(name="child", parent_space_id=parent.id) + space_repo.create(child) + + chain = inheritance_resolver.get_parent_chain(child.id) + assert len(chain) == 2 + assert chain[0].id == parent.id + assert chain[1].id == grandparent.id + + def test_resolve_variable_local(self, inheritance_resolver, space_repo, variable_repo): + """Test resolving a local variable.""" + space = InformationSpace(name="test-space") + space_repo.create(space) + + var = SpaceVariable(space_id=space.id, name="version", value="1.0") + variable_repo.set_variable(var) + + resolved = inheritance_resolver.resolve_variable(space.id, "version") + assert resolved is not None + assert resolved.value == "1.0" + assert resolved.is_local() + + def test_resolve_variable_inherited( + self, inheritance_resolver, space_repo, variable_repo + ): + """Test resolving an inherited variable.""" + parent = InformationSpace(name="parent-space") + space_repo.create(parent) + + child = InformationSpace(name="child-space", parent_space_id=parent.id) + space_repo.create(child) + + # Set variable in parent only + var = SpaceVariable(space_id=parent.id, name="company", value="Acme") + variable_repo.set_variable(var) + + resolved = inheritance_resolver.resolve_variable(child.id, "company") + assert resolved is not None + assert resolved.value == "Acme" + assert not resolved.is_local() + assert resolved.source_space_id == parent.id + + def test_resolve_variable_override( + self, inheritance_resolver, space_repo, variable_repo + ): + """Test that local variable overrides inherited.""" + parent = InformationSpace(name="parent-space") + space_repo.create(parent) + + child = InformationSpace(name="child-space", parent_space_id=parent.id) + space_repo.create(child) + + # Set in both + var_parent = SpaceVariable(space_id=parent.id, name="version", value="1.0") + variable_repo.set_variable(var_parent) + + var_child = SpaceVariable(space_id=child.id, name="version", value="2.0") + variable_repo.set_variable(var_child) + + resolved = inheritance_resolver.resolve_variable(child.id, "version") + assert resolved is not None + assert resolved.value == "2.0" + assert resolved.is_local() + + def test_resolve_all_variables( + self, inheritance_resolver, space_repo, variable_repo + ): + """Test resolving all variables including inherited.""" + parent = InformationSpace(name="parent") + space_repo.create(parent) + + child = InformationSpace(name="child", parent_space_id=parent.id) + space_repo.create(child) + + # Parent variables + variable_repo.set_variable( + SpaceVariable(space_id=parent.id, name="company", value="Acme") + ) + variable_repo.set_variable( + SpaceVariable(space_id=parent.id, name="version", value="1.0") + ) + + # Child variables (one override, one new) + variable_repo.set_variable( + SpaceVariable(space_id=child.id, name="version", value="2.0") + ) + variable_repo.set_variable( + SpaceVariable(space_id=child.id, name="author", value="John") + ) + + all_vars = inheritance_resolver.resolve_all_variables(child.id) + assert len(all_vars) == 3 + assert all_vars["company"].value == "Acme" + assert not all_vars["company"].is_local() + assert all_vars["version"].value == "2.0" + assert all_vars["version"].is_local() + assert all_vars["author"].value == "John" + + def test_resolve_config_inheritance(self, inheritance_resolver, space_repo): + """Test resolving config with inheritance.""" + parent = InformationSpace( + name="parent", + config=SpaceConfig(theme="dark", enable_caching=True), + ) + space_repo.create(parent) + + child = InformationSpace( + name="child", + parent_space_id=parent.id, + config=SpaceConfig(default_variant="flat"), + ) + space_repo.create(child) + + config = inheritance_resolver.resolve_config(child.id) + assert config.theme == "dark" # inherited + assert config.default_variant == "flat" # local + assert config.source_spaces["theme"] == parent.id + + +# =========================================================================== +# AccessControlService Tests +# =========================================================================== + + +class TestAccessControlService: + """Tests for access control service.""" + + def test_grant_permission(self, access_service, space_repo): + """Test granting a permission.""" + space = InformationSpace(name="test-space") + space_repo.create(space) + + perm = access_service.grant_permission( + space_id=space.id, + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.WRITE, + ) + assert perm.access_level == AccessLevel.WRITE + + def test_check_access(self, access_service, space_repo): + """Test checking access.""" + space = InformationSpace(name="test-space") + space_repo.create(space) + + access_service.grant_permission( + space_id=space.id, + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.WRITE, + ) + + assert access_service.check_access( + space.id, "user", "user-1", AccessLevel.READ + ) + assert access_service.check_access( + space.id, "user", "user-1", AccessLevel.WRITE + ) + assert not access_service.check_access( + space.id, "user", "user-1", AccessLevel.ADMIN + ) + + def test_revoke_permission(self, access_service, space_repo): + """Test revoking access.""" + space = InformationSpace(name="test-space") + space_repo.create(space) + + access_service.grant_permission( + space_id=space.id, + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.READ, + ) + + assert access_service.revoke_permission(space.id, "user", "user-1") + assert not access_service.check_access( + space.id, "user", "user-1", AccessLevel.READ + ) + + def test_effective_access_direct(self, access_service, space_repo): + """Test getting effective access with direct permission.""" + space = InformationSpace(name="test-space") + space_repo.create(space) + + access_service.grant_permission( + space_id=space.id, + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.ADMIN, + role=SpaceRole.OWNER, + ) + + access = access_service.get_effective_access(space.id, "user", "user-1") + assert access.effective_level == AccessLevel.ADMIN + assert SpaceRole.OWNER in access.roles + + def test_effective_access_inherited(self, access_service, space_repo): + """Test getting effective access with inherited permission.""" + parent = InformationSpace(name="parent") + space_repo.create(parent) + + child = InformationSpace(name="child", parent_space_id=parent.id) + space_repo.create(child) + + # Grant on parent only + access_service.grant_permission( + space_id=parent.id, + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.READ, + ) + + access = access_service.get_effective_access(child.id, "user", "user-1") + assert access.effective_level == AccessLevel.READ + assert parent.id in access.inherited_from + + +# =========================================================================== +# ComposableSpaceService Tests +# =========================================================================== + + +class TestComposableSpaceService: + """Tests for composable space service.""" + + def test_add_reference(self, composable_service, space_repo): + """Test adding a space reference.""" + space1 = InformationSpace(name="space-1") + space2 = InformationSpace(name="space-2") + space_repo.create(space1) + space_repo.create(space2) + + ref = composable_service.add_reference( + source_space_id=space1.id, + target_space_id=space2.id, + reference_type=SpaceReferenceType.INCLUDES, + alias="shared", + ) + assert ref.alias == "shared" + + def test_add_reference_invalid_source(self, composable_service, space_repo): + """Test adding reference with invalid source.""" + space = InformationSpace(name="valid-space") + space_repo.create(space) + + with pytest.raises(ValueError, match="Source space.*not found"): + composable_service.add_reference( + source_space_id="invalid-id", + target_space_id=space.id, + reference_type=SpaceReferenceType.LINKS_TO, + ) + + def test_add_reference_self_reference(self, composable_service, space_repo): + """Test that self-reference is rejected.""" + space = InformationSpace(name="test-space") + space_repo.create(space) + + with pytest.raises(ValueError, match="Cannot reference self"): + composable_service.add_reference( + source_space_id=space.id, + target_space_id=space.id, + reference_type=SpaceReferenceType.INCLUDES, + ) + + def test_add_reference_circular_detection(self, composable_service, space_repo): + """Test circular reference detection for EXTENDS type.""" + space1 = InformationSpace(name="space-1") + space2 = InformationSpace(name="space-2") + space3 = InformationSpace(name="space-3") + space_repo.create(space1) + space_repo.create(space2) + space_repo.create(space3) + + # space1 -> space2 -> space3 + composable_service.add_reference( + space1.id, space2.id, SpaceReferenceType.EXTENDS + ) + composable_service.add_reference( + space2.id, space3.id, SpaceReferenceType.EXTENDS + ) + + # space3 -> space1 would create a cycle + with pytest.raises(CircularReferenceError): + composable_service.add_reference( + space3.id, space1.id, SpaceReferenceType.EXTENDS + ) + + def test_remove_reference(self, composable_service, space_repo): + """Test removing a reference.""" + space1 = InformationSpace(name="space-1") + space2 = InformationSpace(name="space-2") + space_repo.create(space1) + space_repo.create(space2) + + ref = composable_service.add_reference( + space1.id, space2.id, SpaceReferenceType.LINKS_TO + ) + assert composable_service.remove_reference(ref.id) + + def test_get_references_from_and_to(self, composable_service, space_repo): + """Test getting references.""" + space1 = InformationSpace(name="space-1") + space2 = InformationSpace(name="space-2") + space3 = InformationSpace(name="space-3") + space_repo.create(space1) + space_repo.create(space2) + space_repo.create(space3) + + composable_service.add_reference( + space1.id, space2.id, SpaceReferenceType.INCLUDES + ) + composable_service.add_reference( + space1.id, space3.id, SpaceReferenceType.LINKS_TO + ) + + refs_from = composable_service.get_references_from(space1.id) + assert len(refs_from) == 2 + + refs_to = composable_service.get_references_to(space2.id) + assert len(refs_to) == 1 + + def test_get_composed_spaces(self, composable_service, space_repo): + """Test getting all composed spaces.""" + parent = InformationSpace(name="parent") + space_repo.create(parent) + + main = InformationSpace(name="main", parent_space_id=parent.id) + space_repo.create(main) + + included = InformationSpace(name="included") + extended = InformationSpace(name="extended") + space_repo.create(included) + space_repo.create(extended) + + composable_service.add_reference( + main.id, included.id, SpaceReferenceType.INCLUDES + ) + composable_service.add_reference( + main.id, extended.id, SpaceReferenceType.EXTENDS + ) + + composed = composable_service.get_composed_spaces(main.id) + assert len(composed) == 3 # parent, included, extended + ids = {s.id for s in composed} + assert parent.id in ids + assert included.id in ids + assert extended.id in ids + + def test_resolve_variable(self, composable_service, space_repo, variable_repo): + """Test variable resolution via service.""" + parent = InformationSpace(name="parent") + space_repo.create(parent) + + child = InformationSpace(name="child", parent_space_id=parent.id) + space_repo.create(child) + + variable_repo.set_variable( + SpaceVariable(space_id=parent.id, name="inherited_var", value="from parent") + ) + + resolved = composable_service.resolve_variable(child.id, "inherited_var") + assert resolved is not None + assert resolved.value == "from parent" + + def test_resolve_config(self, composable_service, space_repo): + """Test config resolution via service.""" + space = InformationSpace( + name="test-space", config=SpaceConfig(theme="minimal") + ) + space_repo.create(space) + + config = composable_service.resolve_config(space.id) + assert config.theme == "minimal" + + def test_grant_and_check_access(self, composable_service, space_repo): + """Test access control via service.""" + space = InformationSpace(name="test-space") + space_repo.create(space) + + composable_service.grant_access( + space_id=space.id, + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.WRITE, + role=SpaceRole.EDITOR, + ) + + assert composable_service.check_access( + space.id, "user", "user-1", AccessLevel.WRITE + ) + assert not composable_service.check_access( + space.id, "user", "user-1", AccessLevel.ADMIN + ) + + def test_revoke_access(self, composable_service, space_repo): + """Test revoking access via service.""" + space = InformationSpace(name="test-space") + space_repo.create(space) + + composable_service.grant_access( + space_id=space.id, + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.READ, + ) + + assert composable_service.revoke_access(space.id, "user", "user-1") + assert not composable_service.check_access( + space.id, "user", "user-1", AccessLevel.READ + ) + + def test_get_parent_chain(self, composable_service, space_repo): + """Test getting parent chain via service.""" + grandparent = InformationSpace(name="grandparent") + space_repo.create(grandparent) + + parent = InformationSpace(name="parent", parent_space_id=grandparent.id) + space_repo.create(parent) + + child = InformationSpace(name="child", parent_space_id=parent.id) + space_repo.create(child) + + chain = composable_service.get_parent_chain(child.id) + assert len(chain) == 2 + assert chain[0].name == "parent" + assert chain[1].name == "grandparent" + + def test_get_space_permissions(self, composable_service, space_repo): + """Test getting space permissions.""" + space = InformationSpace(name="test-space") + space_repo.create(space) + + composable_service.grant_access( + space.id, "user", "user-1", AccessLevel.READ + ) + composable_service.grant_access( + space.id, "user", "user-2", AccessLevel.WRITE + ) + + perms = composable_service.get_space_permissions(space.id) + assert len(perms) == 2 + + +# =========================================================================== +# Edge Cases and Error Handling +# =========================================================================== + + +class TestEdgeCases: + """Tests for edge cases and error handling.""" + + def test_max_inheritance_depth(self, space_repo, variable_repo): + """Test max inheritance depth is enforced.""" + resolver = InheritanceResolver( + space_repo=space_repo, + variable_repo=variable_repo, + max_depth=3, + ) + + # Create a chain of 5 spaces + spaces = [] + for i in range(5): + parent_id = spaces[-1].id if spaces else None + space = InformationSpace(name=f"space-{i}", parent_space_id=parent_id) + space_repo.create(space) + spaces.append(space) + + # Getting parent chain from the deepest should raise error + with pytest.raises(CircularReferenceError, match="Maximum inheritance depth"): + resolver.get_parent_chain(spaces[-1].id) + + def test_permission_expiration_handling(self, access_service, space_repo): + """Test that expired permissions are not counted.""" + space = InformationSpace(name="test-space") + space_repo.create(space) + + # Grant expired permission directly in repo + expired_perm = SpacePermission( + space_id=space.id, + principal_type="user", + principal_id="user-1", + access_level=AccessLevel.ADMIN, + expires_at=datetime.now() - timedelta(days=1), + ) + access_service._permission_repo.grant_permission(expired_perm) + + # Should not have access + access = access_service.get_effective_access(space.id, "user", "user-1") + assert access.effective_level == AccessLevel.NONE + + def test_nonexistent_variable(self, inheritance_resolver, space_repo): + """Test resolving nonexistent variable returns None.""" + space = InformationSpace(name="test-space") + space_repo.create(space) + + resolved = inheritance_resolver.resolve_variable(space.id, "nonexistent") + assert resolved is None + + def test_reference_type_filtering(self, composable_service, space_repo): + """Test filtering references by type.""" + space1 = InformationSpace(name="space-1") + space2 = InformationSpace(name="space-2") + space3 = InformationSpace(name="space-3") + space_repo.create(space1) + space_repo.create(space2) + space_repo.create(space3) + + composable_service.add_reference( + space1.id, space2.id, SpaceReferenceType.INCLUDES + ) + composable_service.add_reference( + space1.id, space3.id, SpaceReferenceType.EXTENDS + ) + + # Filter by type + includes = composable_service.get_references_from( + space1.id, SpaceReferenceType.INCLUDES + ) + assert len(includes) == 1 + assert includes[0].target_space_id == space2.id + + extends = composable_service.get_references_from( + space1.id, SpaceReferenceType.EXTENDS + ) + assert len(extends) == 1 + assert extends[0].target_space_id == space3.id