feat(spaces): implement Phase 7 Composability

Implements space composition and inheritance features:
- SpaceReference model for space-to-space references (includes, extends, links_to, composed_of)
- Variable inheritance through parent chain with local override
- Config inheritance with source tracking
- Access control models (SpacePermission, SpaceRole, AccessLevel)
- InheritanceResolver for walking parent chains
- AccessControlService for permission management
- ComposableSpaceService integrating all composability features
- Circular reference detection for EXTENDS references
- SQLite repositories for references and permissions
- 57 comprehensive unit tests

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-08 17:41:40 +01:00
parent 7de57a389d
commit 727ce4d3c5
6 changed files with 2710 additions and 0 deletions

View File

@@ -60,6 +60,29 @@ from .events import (
reset_event_bus,
)
# Phase 7: Composability
from .composability import (
# Models
SpaceReference,
SpaceReferenceType,
SpacePermission,
SpaceRole,
AccessLevel,
SpaceAccess,
InheritedVariable,
InheritedConfig,
# Services
ComposableSpaceService,
InheritanceResolver,
AccessControlService,
# Repositories
ISpaceReferenceRepository,
IAccessControlRepository,
SqliteSpaceReferenceRepository,
SqliteAccessControlRepository,
)
from .composability.service import CircularReferenceError
__all__ = [
# Models
"InformationSpace",
@@ -88,4 +111,23 @@ __all__ = [
"EventBus",
"get_event_bus",
"reset_event_bus",
# Composability - Models
"SpaceReference",
"SpaceReferenceType",
"SpacePermission",
"SpaceRole",
"AccessLevel",
"SpaceAccess",
"InheritedVariable",
"InheritedConfig",
# Composability - Services
"ComposableSpaceService",
"InheritanceResolver",
"AccessControlService",
"CircularReferenceError",
# Composability - Repositories
"ISpaceReferenceRepository",
"IAccessControlRepository",
"SqliteSpaceReferenceRepository",
"SqliteAccessControlRepository",
]

View File

@@ -0,0 +1,49 @@
"""
Composability module for Information Spaces.
Provides space-to-space references, variable/config inheritance,
and basic access control for the Information Space system.
"""
from .models import (
SpaceReference,
SpaceReferenceType,
SpacePermission,
SpaceRole,
AccessLevel,
SpaceAccess,
InheritedVariable,
InheritedConfig,
)
from .service import (
ComposableSpaceService,
InheritanceResolver,
AccessControlService,
)
from .repository import (
ISpaceReferenceRepository,
IAccessControlRepository,
SqliteSpaceReferenceRepository,
SqliteAccessControlRepository,
)
__all__ = [
# Models
"SpaceReference",
"SpaceReferenceType",
"SpacePermission",
"SpaceRole",
"AccessLevel",
"SpaceAccess",
"InheritedVariable",
"InheritedConfig",
# Services
"ComposableSpaceService",
"InheritanceResolver",
"AccessControlService",
# Repositories
"ISpaceReferenceRepository",
"IAccessControlRepository",
"SqliteSpaceReferenceRepository",
"SqliteAccessControlRepository",
]

View File

@@ -0,0 +1,271 @@
"""
Domain models for space composability.
Defines models for space references, inheritance, and access control.
"""
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Any, List, Optional
from enum import Enum
class SpaceReferenceType(Enum):
"""Type of reference between spaces."""
INCLUDES = "includes" # Space A includes content from Space B
EXTENDS = "extends" # Space A extends/inherits from Space B
LINKS_TO = "links_to" # Space A has a soft link to Space B
COMPOSED_OF = "composed_of" # Space A is composed of multiple spaces
@dataclass
class SpaceReference:
"""
Represents a reference between two spaces.
Unlike parent_space_id (single inheritance), space references allow
multiple relationships between spaces for composition.
Attributes:
id: Unique reference identifier
source_space_id: The space making the reference
target_space_id: The space being referenced
reference_type: Type of reference relationship
alias: Optional alias for the referenced space
metadata: Additional reference metadata
created_at: When the reference was created
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
source_space_id: str = ""
target_space_id: str = ""
reference_type: SpaceReferenceType = SpaceReferenceType.LINKS_TO
alias: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"id": self.id,
"source_space_id": self.source_space_id,
"target_space_id": self.target_space_id,
"reference_type": self.reference_type.value,
"alias": self.alias,
"metadata": self.metadata,
"created_at": self.created_at.isoformat(),
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "SpaceReference":
"""Create from dictionary."""
created_at = data.get("created_at")
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at)
elif created_at is None:
created_at = datetime.now()
ref_type = data.get("reference_type", "links_to")
if isinstance(ref_type, str):
ref_type = SpaceReferenceType(ref_type)
return cls(
id=data.get("id", str(uuid.uuid4())),
source_space_id=data.get("source_space_id", ""),
target_space_id=data.get("target_space_id", ""),
reference_type=ref_type,
alias=data.get("alias"),
metadata=data.get("metadata", {}),
created_at=created_at,
)
class AccessLevel(Enum):
"""Access level for space permissions."""
NONE = "none" # No access
READ = "read" # Read-only access
WRITE = "write" # Read and write access
ADMIN = "admin" # Full administrative access
class SpaceRole(Enum):
"""Predefined roles for space access."""
OWNER = "owner" # Full control, can delete space
ADMIN = "admin" # Can manage members and settings
EDITOR = "editor" # Can edit documents and variables
VIEWER = "viewer" # Read-only access
GUEST = "guest" # Limited read access
@dataclass
class SpacePermission:
"""
Permission entry for a space.
Maps a principal (user/group/role) to an access level for a space.
Attributes:
space_id: The space this permission applies to
principal_type: Type of principal (user, group, role)
principal_id: Identifier for the principal
access_level: The access level granted
role: Optional predefined role
granted_by: Who granted this permission
granted_at: When permission was granted
expires_at: Optional expiration timestamp
"""
space_id: str
principal_type: str # "user", "group", "role"
principal_id: str
access_level: AccessLevel = AccessLevel.READ
role: Optional[SpaceRole] = None
granted_by: Optional[str] = None
granted_at: datetime = field(default_factory=datetime.now)
expires_at: Optional[datetime] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"space_id": self.space_id,
"principal_type": self.principal_type,
"principal_id": self.principal_id,
"access_level": self.access_level.value,
"role": self.role.value if self.role else None,
"granted_by": self.granted_by,
"granted_at": self.granted_at.isoformat(),
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "SpacePermission":
"""Create from dictionary."""
granted_at = data.get("granted_at")
if isinstance(granted_at, str):
granted_at = datetime.fromisoformat(granted_at)
elif granted_at is None:
granted_at = datetime.now()
expires_at = data.get("expires_at")
if isinstance(expires_at, str):
expires_at = datetime.fromisoformat(expires_at)
access_level = data.get("access_level", "read")
if isinstance(access_level, str):
access_level = AccessLevel(access_level)
role = data.get("role")
if isinstance(role, str):
role = SpaceRole(role)
return cls(
space_id=data["space_id"],
principal_type=data.get("principal_type", "user"),
principal_id=data["principal_id"],
access_level=access_level,
role=role,
granted_by=data.get("granted_by"),
granted_at=granted_at,
expires_at=expires_at,
)
def is_expired(self) -> bool:
"""Check if permission has expired."""
if self.expires_at is None:
return False
return datetime.now() > self.expires_at
def has_access(self, required_level: AccessLevel) -> bool:
"""Check if this permission grants the required access level."""
if self.is_expired():
return False
level_order = [AccessLevel.NONE, AccessLevel.READ, AccessLevel.WRITE, AccessLevel.ADMIN]
return level_order.index(self.access_level) >= level_order.index(required_level)
@dataclass
class SpaceAccess:
"""
Computed access result for a principal on a space.
Combines all applicable permissions to determine effective access.
Attributes:
space_id: The space being accessed
principal_id: The principal accessing
effective_level: Computed effective access level
roles: All roles that apply
inherited_from: Space IDs from which access was inherited
permissions: Source permissions that contributed
"""
space_id: str
principal_id: str
effective_level: AccessLevel = AccessLevel.NONE
roles: List[SpaceRole] = field(default_factory=list)
inherited_from: List[str] = field(default_factory=list)
permissions: List[SpacePermission] = field(default_factory=list)
def can_read(self) -> bool:
"""Check if principal can read."""
return self.effective_level in [AccessLevel.READ, AccessLevel.WRITE, AccessLevel.ADMIN]
def can_write(self) -> bool:
"""Check if principal can write."""
return self.effective_level in [AccessLevel.WRITE, AccessLevel.ADMIN]
def is_admin(self) -> bool:
"""Check if principal has admin access."""
return self.effective_level == AccessLevel.ADMIN
@dataclass
class InheritedVariable:
"""
A variable with its inheritance source.
Attributes:
name: Variable name
value: Variable value
source_space_id: Space where this variable is defined
inheritance_depth: How many levels up the inheritance chain (0 = local)
scope: Variable scope
"""
name: str
value: Any
source_space_id: str
inheritance_depth: int = 0
scope: str = "space"
def is_local(self) -> bool:
"""Check if variable is locally defined (not inherited)."""
return self.inheritance_depth == 0
@dataclass
class InheritedConfig:
"""
Configuration with inheritance tracking.
Attributes:
default_variant: Effective default variant
enable_caching: Effective caching setting
theme: Effective theme
history_enabled: Effective history setting
variable_scope: Effective variable scope
source_spaces: Map of config key to source space ID
"""
default_variant: str = "hierarchical"
enable_caching: bool = True
theme: Optional[str] = None
history_enabled: bool = False
variable_scope: str = "space"
source_spaces: Dict[str, str] = field(default_factory=dict)
def get_source(self, key: str) -> Optional[str]:
"""Get the source space ID for a config key."""
return self.source_spaces.get(key)
def is_inherited(self, key: str, current_space_id: str) -> bool:
"""Check if a config key is inherited from a parent."""
source = self.source_spaces.get(key)
return source is not None and source != current_space_id

View File

@@ -0,0 +1,424 @@
"""
Repository implementations for space composability.
Provides storage for space references and access control.
"""
import json
import sqlite3
from abc import ABC, abstractmethod
from pathlib import Path
from typing import List, Optional
from datetime import datetime
from .models import (
SpaceReference,
SpaceReferenceType,
SpacePermission,
AccessLevel,
SpaceRole,
)
class ISpaceReferenceRepository(ABC):
"""Interface for space reference storage."""
@abstractmethod
def add_reference(self, reference: SpaceReference) -> SpaceReference:
"""Add a space reference."""
pass
@abstractmethod
def get_reference(self, reference_id: str) -> Optional[SpaceReference]:
"""Get a reference by ID."""
pass
@abstractmethod
def get_references_from(
self, source_space_id: str, ref_type: Optional[SpaceReferenceType] = None
) -> List[SpaceReference]:
"""Get all references from a source space."""
pass
@abstractmethod
def get_references_to(
self, target_space_id: str, ref_type: Optional[SpaceReferenceType] = None
) -> List[SpaceReference]:
"""Get all references to a target space."""
pass
@abstractmethod
def remove_reference(self, reference_id: str) -> bool:
"""Remove a reference."""
pass
@abstractmethod
def remove_references_from(self, source_space_id: str) -> int:
"""Remove all references from a source space."""
pass
@abstractmethod
def reference_exists(
self, source_space_id: str, target_space_id: str, ref_type: SpaceReferenceType
) -> bool:
"""Check if a specific reference exists."""
pass
class IAccessControlRepository(ABC):
"""Interface for access control storage."""
@abstractmethod
def grant_permission(self, permission: SpacePermission) -> SpacePermission:
"""Grant a permission."""
pass
@abstractmethod
def revoke_permission(
self, space_id: str, principal_type: str, principal_id: str
) -> bool:
"""Revoke a permission."""
pass
@abstractmethod
def get_permissions_for_space(self, space_id: str) -> List[SpacePermission]:
"""Get all permissions for a space."""
pass
@abstractmethod
def get_permissions_for_principal(
self, principal_type: str, principal_id: str
) -> List[SpacePermission]:
"""Get all permissions for a principal."""
pass
@abstractmethod
def get_permission(
self, space_id: str, principal_type: str, principal_id: str
) -> Optional[SpacePermission]:
"""Get a specific permission."""
pass
@abstractmethod
def clear_space_permissions(self, space_id: str) -> int:
"""Clear all permissions for a space."""
pass
class SqliteSpaceReferenceRepository(ISpaceReferenceRepository):
"""SQLite implementation of space reference repository."""
def __init__(self, db_path: Path):
"""Initialize the repository.
Args:
db_path: Path to the SQLite database file
"""
self._db_path = db_path
self._init_tables()
def _get_connection(self) -> sqlite3.Connection:
"""Get a database connection."""
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
return conn
def _init_tables(self) -> None:
"""Initialize database tables."""
with self._get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS space_references (
id TEXT PRIMARY KEY,
source_space_id TEXT NOT NULL,
target_space_id TEXT NOT NULL,
reference_type TEXT NOT NULL,
alias TEXT,
metadata TEXT,
created_at TEXT NOT NULL,
UNIQUE(source_space_id, target_space_id, reference_type)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_space_ref_source
ON space_references(source_space_id)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_space_ref_target
ON space_references(target_space_id)
""")
conn.commit()
def add_reference(self, reference: SpaceReference) -> SpaceReference:
"""Add a space reference."""
with self._get_connection() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO space_references
(id, source_space_id, target_space_id, reference_type, alias, metadata, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
reference.id,
reference.source_space_id,
reference.target_space_id,
reference.reference_type.value,
reference.alias,
json.dumps(reference.metadata),
reference.created_at.isoformat(),
),
)
conn.commit()
return reference
def get_reference(self, reference_id: str) -> Optional[SpaceReference]:
"""Get a reference by ID."""
with self._get_connection() as conn:
row = conn.execute(
"SELECT * FROM space_references WHERE id = ?", (reference_id,)
).fetchone()
if row:
return self._row_to_reference(row)
return None
def get_references_from(
self, source_space_id: str, ref_type: Optional[SpaceReferenceType] = None
) -> List[SpaceReference]:
"""Get all references from a source space."""
with self._get_connection() as conn:
if ref_type:
rows = conn.execute(
"""
SELECT * FROM space_references
WHERE source_space_id = ? AND reference_type = ?
ORDER BY created_at
""",
(source_space_id, ref_type.value),
).fetchall()
else:
rows = conn.execute(
"""
SELECT * FROM space_references
WHERE source_space_id = ?
ORDER BY created_at
""",
(source_space_id,),
).fetchall()
return [self._row_to_reference(row) for row in rows]
def get_references_to(
self, target_space_id: str, ref_type: Optional[SpaceReferenceType] = None
) -> List[SpaceReference]:
"""Get all references to a target space."""
with self._get_connection() as conn:
if ref_type:
rows = conn.execute(
"""
SELECT * FROM space_references
WHERE target_space_id = ? AND reference_type = ?
ORDER BY created_at
""",
(target_space_id, ref_type.value),
).fetchall()
else:
rows = conn.execute(
"""
SELECT * FROM space_references
WHERE target_space_id = ?
ORDER BY created_at
""",
(target_space_id,),
).fetchall()
return [self._row_to_reference(row) for row in rows]
def remove_reference(self, reference_id: str) -> bool:
"""Remove a reference."""
with self._get_connection() as conn:
cursor = conn.execute(
"DELETE FROM space_references WHERE id = ?", (reference_id,)
)
conn.commit()
return cursor.rowcount > 0
def remove_references_from(self, source_space_id: str) -> int:
"""Remove all references from a source space."""
with self._get_connection() as conn:
cursor = conn.execute(
"DELETE FROM space_references WHERE source_space_id = ?",
(source_space_id,),
)
conn.commit()
return cursor.rowcount
def reference_exists(
self, source_space_id: str, target_space_id: str, ref_type: SpaceReferenceType
) -> bool:
"""Check if a specific reference exists."""
with self._get_connection() as conn:
row = conn.execute(
"""
SELECT 1 FROM space_references
WHERE source_space_id = ? AND target_space_id = ? AND reference_type = ?
""",
(source_space_id, target_space_id, ref_type.value),
).fetchone()
return row is not None
def _row_to_reference(self, row: sqlite3.Row) -> SpaceReference:
"""Convert a database row to a SpaceReference."""
return SpaceReference(
id=row["id"],
source_space_id=row["source_space_id"],
target_space_id=row["target_space_id"],
reference_type=SpaceReferenceType(row["reference_type"]),
alias=row["alias"],
metadata=json.loads(row["metadata"]) if row["metadata"] else {},
created_at=datetime.fromisoformat(row["created_at"]),
)
class SqliteAccessControlRepository(IAccessControlRepository):
"""SQLite implementation of access control repository."""
def __init__(self, db_path: Path):
"""Initialize the repository.
Args:
db_path: Path to the SQLite database file
"""
self._db_path = db_path
self._init_tables()
def _get_connection(self) -> sqlite3.Connection:
"""Get a database connection."""
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
return conn
def _init_tables(self) -> None:
"""Initialize database tables."""
with self._get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS space_permissions (
space_id TEXT NOT NULL,
principal_type TEXT NOT NULL,
principal_id TEXT NOT NULL,
access_level TEXT NOT NULL,
role TEXT,
granted_by TEXT,
granted_at TEXT NOT NULL,
expires_at TEXT,
PRIMARY KEY(space_id, principal_type, principal_id)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_perm_principal
ON space_permissions(principal_type, principal_id)
""")
conn.commit()
def grant_permission(self, permission: SpacePermission) -> SpacePermission:
"""Grant a permission."""
with self._get_connection() as conn:
conn.execute(
"""
INSERT OR REPLACE INTO space_permissions
(space_id, principal_type, principal_id, access_level, role,
granted_by, granted_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
permission.space_id,
permission.principal_type,
permission.principal_id,
permission.access_level.value,
permission.role.value if permission.role else None,
permission.granted_by,
permission.granted_at.isoformat(),
permission.expires_at.isoformat() if permission.expires_at else None,
),
)
conn.commit()
return permission
def revoke_permission(
self, space_id: str, principal_type: str, principal_id: str
) -> bool:
"""Revoke a permission."""
with self._get_connection() as conn:
cursor = conn.execute(
"""
DELETE FROM space_permissions
WHERE space_id = ? AND principal_type = ? AND principal_id = ?
""",
(space_id, principal_type, principal_id),
)
conn.commit()
return cursor.rowcount > 0
def get_permissions_for_space(self, space_id: str) -> List[SpacePermission]:
"""Get all permissions for a space."""
with self._get_connection() as conn:
rows = conn.execute(
"SELECT * FROM space_permissions WHERE space_id = ?", (space_id,)
).fetchall()
return [self._row_to_permission(row) for row in rows]
def get_permissions_for_principal(
self, principal_type: str, principal_id: str
) -> List[SpacePermission]:
"""Get all permissions for a principal."""
with self._get_connection() as conn:
rows = conn.execute(
"""
SELECT * FROM space_permissions
WHERE principal_type = ? AND principal_id = ?
""",
(principal_type, principal_id),
).fetchall()
return [self._row_to_permission(row) for row in rows]
def get_permission(
self, space_id: str, principal_type: str, principal_id: str
) -> Optional[SpacePermission]:
"""Get a specific permission."""
with self._get_connection() as conn:
row = conn.execute(
"""
SELECT * FROM space_permissions
WHERE space_id = ? AND principal_type = ? AND principal_id = ?
""",
(space_id, principal_type, principal_id),
).fetchone()
if row:
return self._row_to_permission(row)
return None
def clear_space_permissions(self, space_id: str) -> int:
"""Clear all permissions for a space."""
with self._get_connection() as conn:
cursor = conn.execute(
"DELETE FROM space_permissions WHERE space_id = ?", (space_id,)
)
conn.commit()
return cursor.rowcount
def _row_to_permission(self, row: sqlite3.Row) -> SpacePermission:
"""Convert a database row to a SpacePermission."""
expires_at = row["expires_at"]
if expires_at:
expires_at = datetime.fromisoformat(expires_at)
role = row["role"]
if role:
role = SpaceRole(role)
return SpacePermission(
space_id=row["space_id"],
principal_type=row["principal_type"],
principal_id=row["principal_id"],
access_level=AccessLevel(row["access_level"]),
role=role,
granted_by=row["granted_by"],
granted_at=datetime.fromisoformat(row["granted_at"]),
expires_at=expires_at,
)

View File

@@ -0,0 +1,825 @@
"""
Service layer for space composability.
Provides inheritance resolution, reference management, and access control.
"""
from typing import List, Optional, Dict, Any, Set
from dataclasses import dataclass
from ..models import InformationSpace, SpaceVariable, SpaceConfig
from ..repositories.interfaces import ISpaceRepository, IVariableRepository
from .models import (
SpaceReference,
SpaceReferenceType,
SpacePermission,
AccessLevel,
SpaceRole,
SpaceAccess,
InheritedVariable,
InheritedConfig,
)
from .repository import ISpaceReferenceRepository, IAccessControlRepository
class CircularReferenceError(Exception):
"""Raised when a circular reference is detected."""
pass
class InheritanceResolver:
"""
Resolves inherited values through the space hierarchy.
Handles variable inheritance, config inheritance, and reference traversal.
"""
def __init__(
self,
space_repo: ISpaceRepository,
variable_repo: IVariableRepository,
reference_repo: Optional[ISpaceReferenceRepository] = None,
max_depth: int = 10,
):
"""
Initialize the resolver.
Args:
space_repo: Repository for space lookups
variable_repo: Repository for variable lookups
reference_repo: Optional repository for reference lookups
max_depth: Maximum inheritance depth to prevent infinite loops
"""
self._space_repo = space_repo
self._variable_repo = variable_repo
self._reference_repo = reference_repo
self._max_depth = max_depth
def get_parent_chain(
self, space_id: str, visited: Optional[Set[str]] = None
) -> List[InformationSpace]:
"""
Get the chain of parent spaces.
Args:
space_id: Starting space ID
visited: Set of already-visited space IDs (for cycle detection)
Returns:
List of parent spaces, from immediate parent to root
Raises:
CircularReferenceError: If a cycle is detected
"""
if visited is None:
visited = set()
if space_id in visited:
raise CircularReferenceError(
f"Circular reference detected: space {space_id} already in chain"
)
visited.add(space_id)
space = self._space_repo.get_by_id(space_id)
if not space or not space.parent_space_id:
return []
if len(visited) > self._max_depth:
raise CircularReferenceError(
f"Maximum inheritance depth ({self._max_depth}) exceeded"
)
parent = self._space_repo.get_by_id(space.parent_space_id)
if not parent:
return []
return [parent] + self.get_parent_chain(parent.id, visited)
def resolve_variable(
self, space_id: str, name: str
) -> Optional[InheritedVariable]:
"""
Resolve a variable, checking parent spaces if not found locally.
Args:
space_id: The space to start resolution from
name: Variable name to resolve
Returns:
InheritedVariable with value and source, or None if not found
"""
# Check local space first
local_var = self._variable_repo.get_variable(space_id, name)
if local_var:
return InheritedVariable(
name=name,
value=local_var.value,
source_space_id=space_id,
inheritance_depth=0,
scope=local_var.scope,
)
# Walk up parent chain
try:
parents = self.get_parent_chain(space_id)
except CircularReferenceError:
return None
for depth, parent in enumerate(parents, start=1):
parent_var = self._variable_repo.get_variable(parent.id, name)
if parent_var:
return InheritedVariable(
name=name,
value=parent_var.value,
source_space_id=parent.id,
inheritance_depth=depth,
scope=parent_var.scope,
)
return None
def resolve_all_variables(
self, space_id: str, scope: Optional[str] = None
) -> Dict[str, InheritedVariable]:
"""
Resolve all variables for a space, including inherited ones.
Variables defined locally override inherited ones with the same name.
Args:
space_id: The space ID
scope: Optional scope filter
Returns:
Dictionary of variable name to InheritedVariable
"""
result: Dict[str, InheritedVariable] = {}
# Start from the furthest ancestor and work down
try:
parents = self.get_parent_chain(space_id)
except CircularReferenceError:
parents = []
# Build list from root to current: [grandparent, parent, current]
# parents is [parent, grandparent] so reverse it
current_space = self._space_repo.get_by_id(space_id)
all_spaces = list(reversed(parents))
if current_space:
all_spaces.append(current_space)
# Process from root to current so local values override inherited ones
# Index 0 is root (highest depth), last index is current (depth 0)
total = len(all_spaces)
for i, space in enumerate(all_spaces):
# depth_from_current: current=0, parent=1, grandparent=2, etc.
depth_from_current = total - 1 - i
variables = self._variable_repo.list_variables(space.id, scope)
for var in variables:
result[var.name] = InheritedVariable(
name=var.name,
value=var.value,
source_space_id=space.id,
inheritance_depth=depth_from_current,
scope=var.scope,
)
return result
def resolve_config(self, space_id: str) -> InheritedConfig:
"""
Resolve the effective configuration for a space.
Config values are inherited from parents unless overridden locally.
Args:
space_id: The space ID
Returns:
InheritedConfig with effective values and sources
"""
space = self._space_repo.get_by_id(space_id)
if not space:
return InheritedConfig()
# Start with defaults
result = InheritedConfig(
default_variant="hierarchical",
enable_caching=True,
theme=None,
history_enabled=False,
variable_scope="space",
source_spaces={},
)
# Get parent chain
try:
parents = self.get_parent_chain(space_id)
except CircularReferenceError:
parents = []
# Process from root to current (reversed order)
all_spaces = list(reversed(parents)) + [space]
for s in all_spaces:
config = s.config
if config.default_variant:
result.default_variant = config.default_variant
result.source_spaces["default_variant"] = s.id
if config.enable_caching is not None:
result.enable_caching = config.enable_caching
result.source_spaces["enable_caching"] = s.id
if config.theme:
result.theme = config.theme
result.source_spaces["theme"] = s.id
if config.history_enabled is not None:
result.history_enabled = config.history_enabled
result.source_spaces["history_enabled"] = s.id
if config.variable_scope:
result.variable_scope = config.variable_scope
result.source_spaces["variable_scope"] = s.id
return result
class AccessControlService:
"""
Service for managing space access control.
Handles permission grants, checks, and inheritance.
"""
def __init__(
self,
permission_repo: IAccessControlRepository,
space_repo: ISpaceRepository,
inheritance_resolver: Optional[InheritanceResolver] = None,
):
"""
Initialize the service.
Args:
permission_repo: Repository for permission storage
space_repo: Repository for space lookups
inheritance_resolver: Optional resolver for inherited permissions
"""
self._permission_repo = permission_repo
self._space_repo = space_repo
self._inheritance_resolver = inheritance_resolver
def grant_permission(
self,
space_id: str,
principal_type: str,
principal_id: str,
access_level: AccessLevel,
role: Optional[SpaceRole] = None,
granted_by: Optional[str] = None,
) -> SpacePermission:
"""
Grant a permission to a principal.
Args:
space_id: The space ID
principal_type: Type of principal (user, group, role)
principal_id: Principal identifier
access_level: Access level to grant
role: Optional role assignment
granted_by: Who is granting the permission
Returns:
The created permission
"""
permission = SpacePermission(
space_id=space_id,
principal_type=principal_type,
principal_id=principal_id,
access_level=access_level,
role=role,
granted_by=granted_by,
)
return self._permission_repo.grant_permission(permission)
def revoke_permission(
self, space_id: str, principal_type: str, principal_id: str
) -> bool:
"""
Revoke a permission.
Args:
space_id: The space ID
principal_type: Type of principal
principal_id: Principal identifier
Returns:
True if revoked, False if not found
"""
return self._permission_repo.revoke_permission(
space_id, principal_type, principal_id
)
def check_access(
self,
space_id: str,
principal_type: str,
principal_id: str,
required_level: AccessLevel = AccessLevel.READ,
) -> bool:
"""
Check if a principal has the required access level.
Args:
space_id: The space ID
principal_type: Type of principal
principal_id: Principal identifier
required_level: Required access level
Returns:
True if access is granted
"""
access = self.get_effective_access(space_id, principal_type, principal_id)
if access.effective_level == AccessLevel.NONE:
return False
# Compare using level order, not string values
level_order = [AccessLevel.NONE, AccessLevel.READ, AccessLevel.WRITE, AccessLevel.ADMIN]
return level_order.index(access.effective_level) >= level_order.index(required_level)
def get_effective_access(
self, space_id: str, principal_type: str, principal_id: str
) -> SpaceAccess:
"""
Get the effective access for a principal on a space.
Considers direct permissions and inherited permissions from parent spaces.
Args:
space_id: The space ID
principal_type: Type of principal
principal_id: Principal identifier
Returns:
SpaceAccess with computed effective access
"""
result = SpaceAccess(
space_id=space_id,
principal_id=principal_id,
effective_level=AccessLevel.NONE,
roles=[],
inherited_from=[],
permissions=[],
)
# Check direct permission
direct = self._permission_repo.get_permission(
space_id, principal_type, principal_id
)
if direct and not direct.is_expired():
result.permissions.append(direct)
result.effective_level = direct.access_level
if direct.role:
result.roles.append(direct.role)
# Check inherited permissions if we have a resolver
if self._inheritance_resolver:
try:
parents = self._inheritance_resolver.get_parent_chain(space_id)
except CircularReferenceError:
parents = []
for parent in parents:
parent_perm = self._permission_repo.get_permission(
parent.id, principal_type, principal_id
)
if parent_perm and not parent_perm.is_expired():
result.permissions.append(parent_perm)
result.inherited_from.append(parent.id)
# Take the highest access level
level_order = [
AccessLevel.NONE,
AccessLevel.READ,
AccessLevel.WRITE,
AccessLevel.ADMIN,
]
if level_order.index(parent_perm.access_level) > level_order.index(
result.effective_level
):
result.effective_level = parent_perm.access_level
if parent_perm.role and parent_perm.role not in result.roles:
result.roles.append(parent_perm.role)
return result
def get_space_permissions(self, space_id: str) -> List[SpacePermission]:
"""
Get all permissions for a space.
Args:
space_id: The space ID
Returns:
List of permissions
"""
return self._permission_repo.get_permissions_for_space(space_id)
def get_principal_permissions(
self, principal_type: str, principal_id: str
) -> List[SpacePermission]:
"""
Get all permissions for a principal.
Args:
principal_type: Type of principal
principal_id: Principal identifier
Returns:
List of permissions
"""
return self._permission_repo.get_permissions_for_principal(
principal_type, principal_id
)
class ComposableSpaceService:
"""
Extended space service with composability features.
Provides space references, inheritance resolution, and access control
on top of the base SpaceService functionality.
"""
def __init__(
self,
space_repo: ISpaceRepository,
variable_repo: IVariableRepository,
reference_repo: ISpaceReferenceRepository,
permission_repo: IAccessControlRepository,
max_inheritance_depth: int = 10,
):
"""
Initialize the service.
Args:
space_repo: Repository for spaces
variable_repo: Repository for variables
reference_repo: Repository for space references
permission_repo: Repository for permissions
max_inheritance_depth: Maximum inheritance chain depth
"""
self._space_repo = space_repo
self._variable_repo = variable_repo
self._reference_repo = reference_repo
self._permission_repo = permission_repo
self._inheritance_resolver = InheritanceResolver(
space_repo=space_repo,
variable_repo=variable_repo,
reference_repo=reference_repo,
max_depth=max_inheritance_depth,
)
self._access_service = AccessControlService(
permission_repo=permission_repo,
space_repo=space_repo,
inheritance_resolver=self._inheritance_resolver,
)
# =========================================================================
# Space References
# =========================================================================
def add_reference(
self,
source_space_id: str,
target_space_id: str,
reference_type: SpaceReferenceType = SpaceReferenceType.LINKS_TO,
alias: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SpaceReference:
"""
Add a reference from one space to another.
Args:
source_space_id: The space making the reference
target_space_id: The space being referenced
reference_type: Type of reference
alias: Optional alias for the target space
metadata: Optional metadata
Returns:
The created reference
Raises:
ValueError: If spaces don't exist or reference would create a cycle
CircularReferenceError: If reference would create a circular dependency
"""
# Validate spaces exist
if not self._space_repo.exists(source_space_id):
raise ValueError(f"Source space '{source_space_id}' not found")
if not self._space_repo.exists(target_space_id):
raise ValueError(f"Target space '{target_space_id}' not found")
# Check for self-reference
if source_space_id == target_space_id:
raise ValueError("Cannot reference self")
# Check for circular reference (for EXTENDS type)
if reference_type == SpaceReferenceType.EXTENDS:
self._check_circular_reference(source_space_id, target_space_id)
reference = SpaceReference(
source_space_id=source_space_id,
target_space_id=target_space_id,
reference_type=reference_type,
alias=alias,
metadata=metadata or {},
)
return self._reference_repo.add_reference(reference)
def _check_circular_reference(
self, source_id: str, target_id: str, visited: Optional[Set[str]] = None
) -> None:
"""Check if adding a reference would create a cycle."""
if visited is None:
visited = {source_id}
if target_id in visited:
raise CircularReferenceError(
f"Adding reference would create cycle: {target_id} already in chain"
)
visited.add(target_id)
# Check existing references from target
refs = self._reference_repo.get_references_from(
target_id, SpaceReferenceType.EXTENDS
)
for ref in refs:
self._check_circular_reference(source_id, ref.target_space_id, visited)
# Also check parent_space_id chain
target_space = self._space_repo.get_by_id(target_id)
if target_space and target_space.parent_space_id:
if target_space.parent_space_id in visited:
raise CircularReferenceError(
f"Adding reference would create cycle through parent: "
f"{target_space.parent_space_id}"
)
self._check_circular_reference(
source_id, target_space.parent_space_id, visited
)
def remove_reference(self, reference_id: str) -> bool:
"""
Remove a space reference.
Args:
reference_id: The reference ID
Returns:
True if removed, False if not found
"""
return self._reference_repo.remove_reference(reference_id)
def get_references_from(
self,
space_id: str,
reference_type: Optional[SpaceReferenceType] = None,
) -> List[SpaceReference]:
"""
Get all references from a space.
Args:
space_id: The source space ID
reference_type: Optional filter by type
Returns:
List of references
"""
return self._reference_repo.get_references_from(space_id, reference_type)
def get_references_to(
self,
space_id: str,
reference_type: Optional[SpaceReferenceType] = None,
) -> List[SpaceReference]:
"""
Get all references to a space.
Args:
space_id: The target space ID
reference_type: Optional filter by type
Returns:
List of references
"""
return self._reference_repo.get_references_to(space_id, reference_type)
def get_composed_spaces(self, space_id: str) -> List[InformationSpace]:
"""
Get all spaces that compose this space.
Includes parent, extended spaces, and included spaces.
Args:
space_id: The space ID
Returns:
List of composed spaces
"""
result: List[InformationSpace] = []
seen: Set[str] = {space_id}
space = self._space_repo.get_by_id(space_id)
if not space:
return []
# Add parent
if space.parent_space_id:
parent = self._space_repo.get_by_id(space.parent_space_id)
if parent and parent.id not in seen:
result.append(parent)
seen.add(parent.id)
# Add extended spaces
extends_refs = self._reference_repo.get_references_from(
space_id, SpaceReferenceType.EXTENDS
)
for ref in extends_refs:
if ref.target_space_id not in seen:
target = self._space_repo.get_by_id(ref.target_space_id)
if target:
result.append(target)
seen.add(ref.target_space_id)
# Add included spaces
includes_refs = self._reference_repo.get_references_from(
space_id, SpaceReferenceType.INCLUDES
)
for ref in includes_refs:
if ref.target_space_id not in seen:
target = self._space_repo.get_by_id(ref.target_space_id)
if target:
result.append(target)
seen.add(ref.target_space_id)
return result
# =========================================================================
# Inheritance Resolution
# =========================================================================
def resolve_variable(
self, space_id: str, name: str
) -> Optional[InheritedVariable]:
"""
Resolve a variable with inheritance.
Args:
space_id: The space ID
name: Variable name
Returns:
InheritedVariable or None if not found
"""
return self._inheritance_resolver.resolve_variable(space_id, name)
def resolve_all_variables(
self, space_id: str, scope: Optional[str] = None
) -> Dict[str, InheritedVariable]:
"""
Resolve all variables including inherited ones.
Args:
space_id: The space ID
scope: Optional scope filter
Returns:
Dictionary of variable name to InheritedVariable
"""
return self._inheritance_resolver.resolve_all_variables(space_id, scope)
def resolve_config(self, space_id: str) -> InheritedConfig:
"""
Resolve effective configuration with inheritance.
Args:
space_id: The space ID
Returns:
InheritedConfig with effective values
"""
return self._inheritance_resolver.resolve_config(space_id)
def get_parent_chain(self, space_id: str) -> List[InformationSpace]:
"""
Get the parent space chain.
Args:
space_id: The space ID
Returns:
List of parent spaces from immediate parent to root
"""
return self._inheritance_resolver.get_parent_chain(space_id)
# =========================================================================
# Access Control
# =========================================================================
def grant_access(
self,
space_id: str,
principal_type: str,
principal_id: str,
access_level: AccessLevel,
role: Optional[SpaceRole] = None,
granted_by: Optional[str] = None,
) -> SpacePermission:
"""
Grant access to a space.
Args:
space_id: The space ID
principal_type: Type of principal (user, group)
principal_id: Principal identifier
access_level: Access level to grant
role: Optional role
granted_by: Who is granting access
Returns:
The created permission
"""
return self._access_service.grant_permission(
space_id, principal_type, principal_id, access_level, role, granted_by
)
def revoke_access(
self, space_id: str, principal_type: str, principal_id: str
) -> bool:
"""
Revoke access to a space.
Args:
space_id: The space ID
principal_type: Type of principal
principal_id: Principal identifier
Returns:
True if revoked
"""
return self._access_service.revoke_permission(
space_id, principal_type, principal_id
)
def check_access(
self,
space_id: str,
principal_type: str,
principal_id: str,
required_level: AccessLevel = AccessLevel.READ,
) -> bool:
"""
Check if a principal has required access.
Args:
space_id: The space ID
principal_type: Type of principal
principal_id: Principal identifier
required_level: Required access level
Returns:
True if access is granted
"""
return self._access_service.check_access(
space_id, principal_type, principal_id, required_level
)
def get_effective_access(
self, space_id: str, principal_type: str, principal_id: str
) -> SpaceAccess:
"""
Get effective access including inheritance.
Args:
space_id: The space ID
principal_type: Type of principal
principal_id: Principal identifier
Returns:
SpaceAccess with computed access
"""
return self._access_service.get_effective_access(
space_id, principal_type, principal_id
)
def get_space_permissions(self, space_id: str) -> List[SpacePermission]:
"""Get all permissions for a space."""
return self._access_service.get_space_permissions(space_id)

File diff suppressed because it is too large Load Diff