CMIS compliance/test foundation

This commit is contained in:
2026-05-07 00:35:33 +02:00
parent 28420c68d1
commit 241522e74d
14 changed files with 1080 additions and 12 deletions

View File

@@ -0,0 +1,427 @@
"""CMIS access-point profile primitives.
These classes model how a CMIS adapter may expose engine capabilities without
turning CMIS into a second domain model.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from .actors import ActorType, OperationContext
from .assets import KnowledgeAsset
from .metadata import Sensitivity
from .policy import PolicyDecision
from .primitives import compact_dict
class CMISBinding(str, Enum):
BROWSER = "browser"
ATOMPUB = "atompub"
WEB_SERVICES = "web_services"
class CMISCapability(str, Enum):
REPOSITORY = "repository"
TYPE_DEFINITIONS = "type_definitions"
NAVIGATION = "navigation"
OBJECT_READ = "object_read"
OBJECT_WRITE = "object_write"
CONTENT_STREAM_READ = "content_stream_read"
CONTENT_STREAM_WRITE = "content_stream_write"
VERSIONING = "versioning"
DISCOVERY_QUERY = "discovery_query"
RELATIONSHIPS = "relationships"
ACL = "acl"
POLICY = "policy"
CHANGE_LOG = "change_log"
RENDITIONS = "renditions"
RETENTION_HOLD = "retention_hold"
BULK_UPDATE = "bulk_update"
class CMISAction(str, Enum):
GET_REPOSITORY_INFO = "get_repository_info"
GET_TYPE_DEFINITION = "get_type_definition"
GET_CHILDREN = "get_children"
GET_OBJECT = "get_object"
GET_CONTENT_STREAM = "get_content_stream"
QUERY = "query"
GET_RELATIONSHIPS = "get_relationships"
GET_CHANGE_LOG = "get_change_log"
CREATE_DOCUMENT = "create_document"
UPDATE_PROPERTIES = "update_properties"
DELETE_OBJECT = "delete_object"
SET_CONTENT_STREAM = "set_content_stream"
APPLY_ACL = "apply_acl"
APPLY_POLICY = "apply_policy"
BULK_UPDATE_PROPERTIES = "bulk_update_properties"
ACTION_CAPABILITIES: dict[CMISAction, CMISCapability] = {
CMISAction.GET_REPOSITORY_INFO: CMISCapability.REPOSITORY,
CMISAction.GET_TYPE_DEFINITION: CMISCapability.TYPE_DEFINITIONS,
CMISAction.GET_CHILDREN: CMISCapability.NAVIGATION,
CMISAction.GET_OBJECT: CMISCapability.OBJECT_READ,
CMISAction.GET_CONTENT_STREAM: CMISCapability.CONTENT_STREAM_READ,
CMISAction.QUERY: CMISCapability.DISCOVERY_QUERY,
CMISAction.GET_RELATIONSHIPS: CMISCapability.RELATIONSHIPS,
CMISAction.GET_CHANGE_LOG: CMISCapability.CHANGE_LOG,
CMISAction.CREATE_DOCUMENT: CMISCapability.OBJECT_WRITE,
CMISAction.UPDATE_PROPERTIES: CMISCapability.OBJECT_WRITE,
CMISAction.DELETE_OBJECT: CMISCapability.OBJECT_WRITE,
CMISAction.SET_CONTENT_STREAM: CMISCapability.CONTENT_STREAM_WRITE,
CMISAction.APPLY_ACL: CMISCapability.ACL,
CMISAction.APPLY_POLICY: CMISCapability.POLICY,
CMISAction.BULK_UPDATE_PROPERTIES: CMISCapability.BULK_UPDATE,
}
MUTATION_ACTIONS = {
CMISAction.CREATE_DOCUMENT,
CMISAction.UPDATE_PROPERTIES,
CMISAction.DELETE_OBJECT,
CMISAction.SET_CONTENT_STREAM,
CMISAction.APPLY_ACL,
CMISAction.APPLY_POLICY,
CMISAction.BULK_UPDATE_PROPERTIES,
}
@dataclass(frozen=True)
class CMISAccessProfile:
name: str
binding: CMISBinding | str = CMISBinding.BROWSER
capabilities: tuple[CMISCapability | str, ...] = ()
allow_mutations: bool = False
visible_sensitivities: tuple[Sensitivity | str, ...] = (Sensitivity.PUBLIC, Sensitivity.INTERNAL)
denied_sensitivities: tuple[Sensitivity | str, ...] = (Sensitivity.CONFIDENTIAL, Sensitivity.RESTRICTED)
visible_asset_types: tuple[str, ...] = ()
visible_topics: tuple[str, ...] = ()
visible_source_systems: tuple[str, ...] = ()
denied_metadata: dict[str, tuple[Any, ...]] = field(default_factory=dict)
required_actor_types: tuple[ActorType | str, ...] = ()
metadata: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
object.__setattr__(self, "binding", CMISBinding(self.binding))
object.__setattr__(
self,
"capabilities",
tuple(CMISCapability(capability) for capability in self.capabilities),
)
object.__setattr__(
self,
"visible_sensitivities",
tuple(Sensitivity(value) for value in self.visible_sensitivities),
)
object.__setattr__(
self,
"denied_sensitivities",
tuple(Sensitivity(value) for value in self.denied_sensitivities),
)
object.__setattr__(
self,
"required_actor_types",
tuple(ActorType(value) for value in self.required_actor_types),
)
object.__setattr__(
self,
"denied_metadata",
{key: tuple(values) for key, values in self.denied_metadata.items()},
)
@classmethod
def readonly_browser(cls) -> "CMISAccessProfile":
return cls(
name="readonly-browser",
capabilities=_read_capabilities(),
allow_mutations=False,
)
@classmethod
def governed_authoring(cls) -> "CMISAccessProfile":
return cls(
name="governed-authoring",
capabilities=_read_capabilities()
+ (
CMISCapability.OBJECT_WRITE,
CMISCapability.CONTENT_STREAM_WRITE,
),
allow_mutations=True,
)
@classmethod
def admin_export(cls) -> "CMISAccessProfile":
return cls(
name="admin-export",
capabilities=_read_capabilities()
+ (
CMISCapability.RENDITIONS,
CMISCapability.RETENTION_HOLD,
),
allow_mutations=False,
visible_sensitivities=(
Sensitivity.PUBLIC,
Sensitivity.INTERNAL,
Sensitivity.CONFIDENTIAL,
Sensitivity.RESTRICTED,
),
denied_sensitivities=(),
required_actor_types=(ActorType.SERVICE_ACCOUNT,),
)
@classmethod
def compat_tck(cls) -> "CMISAccessProfile":
return cls(
name="compat-tck",
capabilities=_read_capabilities()
+ (
CMISCapability.OBJECT_WRITE,
CMISCapability.CONTENT_STREAM_WRITE,
),
allow_mutations=True,
metadata={"compatibility": "selected-opencmis-tck-browser-subset"},
)
def has_capability(self, capability: CMISCapability | str) -> bool:
return CMISCapability(capability) in self.capabilities
def allows_action(self, action: CMISAction | str) -> bool:
cmis_action = CMISAction(action)
capability = ACTION_CAPABILITIES[cmis_action]
if not self.has_capability(capability):
return False
if cmis_action in MUTATION_ACTIONS and not self.allow_mutations:
return False
return True
def decide_action(
self,
action: CMISAction | str,
context: OperationContext,
*,
resource: str = "cmis:repository",
) -> PolicyDecision:
cmis_action = CMISAction(action)
if not self.allows_actor(context):
return PolicyDecision.deny(
context.actor.id,
cmis_action.value,
resource,
reason="actor_type_not_allowed_for_cmis_profile",
context={"profile": self.name},
)
if self.allows_action(cmis_action):
return PolicyDecision.allow(
context.actor.id,
cmis_action.value,
resource,
context={"profile": self.name},
)
capability = ACTION_CAPABILITIES[cmis_action]
reason = "cmis_mutation_not_allowed" if cmis_action in MUTATION_ACTIONS else "cmis_capability_not_supported"
return PolicyDecision.deny(
context.actor.id,
cmis_action.value,
resource,
reason=reason,
context={"profile": self.name, "capability": capability.value},
)
def allows_actor(self, context: OperationContext) -> bool:
return not self.required_actor_types or context.actor.actor_type in self.required_actor_types
def exposes_asset(self, asset: KnowledgeAsset, context: OperationContext) -> bool:
return self.decide_asset_visibility(asset, context).allowed
def decide_asset_visibility(
self,
asset: KnowledgeAsset,
context: OperationContext,
) -> PolicyDecision:
resource = f"asset:{asset.id}"
if not self.allows_actor(context):
return PolicyDecision.deny(
context.actor.id,
"cmis.expose_asset",
resource,
reason="actor_type_not_allowed_for_cmis_profile",
context={"profile": self.name},
)
classification = asset.classification
if classification.sensitivity in self.denied_sensitivities:
return PolicyDecision.deny(
context.actor.id,
"cmis.expose_asset",
resource,
reason="cmis_sensitivity_denied",
context={"profile": self.name, "sensitivity": _enum_value(classification.sensitivity)},
)
if classification.sensitivity not in self.visible_sensitivities:
return PolicyDecision.deny(
context.actor.id,
"cmis.expose_asset",
resource,
reason="cmis_sensitivity_not_visible",
context={"profile": self.name, "sensitivity": _enum_value(classification.sensitivity)},
)
if self.visible_asset_types and classification.asset_type not in self.visible_asset_types:
return PolicyDecision.deny(
context.actor.id,
"cmis.expose_asset",
resource,
reason="cmis_asset_type_not_visible",
context={"profile": self.name, "asset_type": classification.asset_type},
)
if self.visible_topics and not set(classification.topics).intersection(self.visible_topics):
return PolicyDecision.deny(
context.actor.id,
"cmis.expose_asset",
resource,
reason="cmis_topic_not_visible",
context={"profile": self.name, "topics": list(classification.topics)},
)
source_system = asset.metadata.get("source_system") or classification.metadata.get("source_system")
if self.visible_source_systems and source_system not in self.visible_source_systems:
return PolicyDecision.deny(
context.actor.id,
"cmis.expose_asset",
resource,
reason="cmis_source_system_not_visible",
context={"profile": self.name, "source_system": source_system},
)
for key, denied_values in self.denied_metadata.items():
value = asset.metadata.get(key, classification.metadata.get(key))
if value in denied_values:
return PolicyDecision.deny(
context.actor.id,
"cmis.expose_asset",
resource,
reason="cmis_metadata_denied",
context={"profile": self.name, "metadata_key": key},
)
return PolicyDecision.allow(
context.actor.id,
"cmis.expose_asset",
resource,
context={"profile": self.name},
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"name": self.name,
"binding": self.binding.value,
"capabilities": [capability.value for capability in self.capabilities],
"allow_mutations": self.allow_mutations,
"visible_sensitivities": [item.value for item in self.visible_sensitivities],
"denied_sensitivities": [item.value for item in self.denied_sensitivities],
"visible_asset_types": list(self.visible_asset_types),
"visible_topics": list(self.visible_topics),
"visible_source_systems": list(self.visible_source_systems),
"denied_metadata": {key: list(values) for key, values in self.denied_metadata.items()},
"required_actor_types": [item.value for item in self.required_actor_types],
"metadata": dict(self.metadata),
}
)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "CMISAccessProfile":
return cls(
name=data["name"],
binding=data.get("binding", CMISBinding.BROWSER.value),
capabilities=tuple(data.get("capabilities", [])),
allow_mutations=bool(data.get("allow_mutations", False)),
visible_sensitivities=tuple(data.get("visible_sensitivities", [])),
denied_sensitivities=tuple(data.get("denied_sensitivities", [])),
visible_asset_types=tuple(data.get("visible_asset_types", [])),
visible_topics=tuple(data.get("visible_topics", [])),
visible_source_systems=tuple(data.get("visible_source_systems", [])),
denied_metadata=dict(data.get("denied_metadata", {})),
required_actor_types=tuple(data.get("required_actor_types", [])),
metadata=dict(data.get("metadata", {})),
)
@dataclass(frozen=True)
class CMISAccessPoint:
access_point_id: str
repository_id: str
profile: CMISAccessProfile
base_path: str
root_folder_id: str = "cmis-root"
enabled: bool = True
metadata: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
normalized = "/" + self.base_path.strip("/")
object.__setattr__(self, "base_path", normalized)
def decide_action(
self,
action: CMISAction | str,
context: OperationContext,
*,
resource: str | None = None,
) -> PolicyDecision:
if not self.enabled:
return PolicyDecision.deny(
context.actor.id,
CMISAction(action).value,
resource or f"cmis:{self.repository_id}",
reason="cmis_access_point_disabled",
context={"access_point_id": self.access_point_id, "profile": self.profile.name},
)
return self.profile.decide_action(
action,
context,
resource=resource or f"cmis:{self.repository_id}",
)
def exposes_asset(self, asset: KnowledgeAsset, context: OperationContext) -> bool:
return self.enabled and self.profile.exposes_asset(asset, context)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"access_point_id": self.access_point_id,
"repository_id": self.repository_id,
"profile": self.profile.to_dict(),
"base_path": self.base_path,
"root_folder_id": self.root_folder_id,
"enabled": self.enabled,
"metadata": dict(self.metadata),
}
)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "CMISAccessPoint":
return cls(
access_point_id=data["access_point_id"],
repository_id=data["repository_id"],
profile=CMISAccessProfile.from_dict(data["profile"]),
base_path=data["base_path"],
root_folder_id=data.get("root_folder_id", "cmis-root"),
enabled=bool(data.get("enabled", True)),
metadata=dict(data.get("metadata", {})),
)
def _read_capabilities() -> tuple[CMISCapability, ...]:
return (
CMISCapability.REPOSITORY,
CMISCapability.TYPE_DEFINITIONS,
CMISCapability.NAVIGATION,
CMISCapability.OBJECT_READ,
CMISCapability.CONTENT_STREAM_READ,
CMISCapability.VERSIONING,
CMISCapability.DISCOVERY_QUERY,
CMISCapability.RELATIONSHIPS,
CMISCapability.ACL,
CMISCapability.CHANGE_LOG,
)
def _enum_value(value: Any) -> Any:
return getattr(value, "value", value)