"""CMIS access-point profile primitives. These classes model how a CMIS adapter may expose engine capabilities without turning CMIS into a second domain model. """ from __future__ import annotations from dataclasses import dataclass, field from enum import Enum from typing import Any from .actors import ActorType, OperationContext from .assets import AssetRepresentation, KnowledgeAsset, RepresentationKind from .metadata import LifecycleState, Sensitivity from .policy import PolicyDecision from .provenance import AssetVersion from .relationships import CoreRelationship, RelationshipTargetKind from .primitives import compact_dict class CMISBinding(str, Enum): BROWSER = "browser" ATOMPUB = "atompub" WEB_SERVICES = "web_services" class CMISCapability(str, Enum): REPOSITORY = "repository" TYPE_DEFINITIONS = "type_definitions" NAVIGATION = "navigation" OBJECT_READ = "object_read" OBJECT_WRITE = "object_write" CONTENT_STREAM_READ = "content_stream_read" CONTENT_STREAM_WRITE = "content_stream_write" VERSIONING = "versioning" DISCOVERY_QUERY = "discovery_query" RELATIONSHIPS = "relationships" ACL = "acl" POLICY = "policy" CHANGE_LOG = "change_log" RENDITIONS = "renditions" RETENTION_HOLD = "retention_hold" BULK_UPDATE = "bulk_update" class CMISAction(str, Enum): GET_REPOSITORY_INFO = "get_repository_info" GET_TYPE_DEFINITION = "get_type_definition" GET_CHILDREN = "get_children" GET_OBJECT_PARENTS = "get_object_parents" GET_OBJECT = "get_object" GET_CONTENT_STREAM = "get_content_stream" GET_ACL = "get_acl" QUERY = "query" GET_RELATIONSHIPS = "get_relationships" GET_CHANGE_LOG = "get_change_log" CREATE_FOLDER = "create_folder" CREATE_DOCUMENT = "create_document" UPDATE_PROPERTIES = "update_properties" DELETE_OBJECT = "delete_object" DELETE_TREE = "delete_tree" SET_CONTENT_STREAM = "set_content_stream" APPLY_ACL = "apply_acl" APPLY_POLICY = "apply_policy" BULK_UPDATE_PROPERTIES = "bulk_update_properties" class CMISBaseType(str, Enum): DOCUMENT = "cmis:document" FOLDER = "cmis:folder" RELATIONSHIP = "cmis:relationship" POLICY = "cmis:policy" ITEM = "cmis:item" SECONDARY = "cmis:secondary" ACTION_CAPABILITIES: dict[CMISAction, CMISCapability] = { CMISAction.GET_REPOSITORY_INFO: CMISCapability.REPOSITORY, CMISAction.GET_TYPE_DEFINITION: CMISCapability.TYPE_DEFINITIONS, CMISAction.GET_CHILDREN: CMISCapability.NAVIGATION, CMISAction.GET_OBJECT_PARENTS: CMISCapability.NAVIGATION, CMISAction.GET_OBJECT: CMISCapability.OBJECT_READ, CMISAction.GET_CONTENT_STREAM: CMISCapability.CONTENT_STREAM_READ, CMISAction.GET_ACL: CMISCapability.ACL, CMISAction.QUERY: CMISCapability.DISCOVERY_QUERY, CMISAction.GET_RELATIONSHIPS: CMISCapability.RELATIONSHIPS, CMISAction.GET_CHANGE_LOG: CMISCapability.CHANGE_LOG, CMISAction.CREATE_FOLDER: CMISCapability.OBJECT_WRITE, CMISAction.CREATE_DOCUMENT: CMISCapability.OBJECT_WRITE, CMISAction.UPDATE_PROPERTIES: CMISCapability.OBJECT_WRITE, CMISAction.DELETE_OBJECT: CMISCapability.OBJECT_WRITE, CMISAction.DELETE_TREE: CMISCapability.OBJECT_WRITE, CMISAction.SET_CONTENT_STREAM: CMISCapability.CONTENT_STREAM_WRITE, CMISAction.APPLY_ACL: CMISCapability.ACL, CMISAction.APPLY_POLICY: CMISCapability.POLICY, CMISAction.BULK_UPDATE_PROPERTIES: CMISCapability.BULK_UPDATE, } IMPLEMENTED_CMIS_ACTIONS: frozenset[CMISAction] = frozenset( { CMISAction.GET_REPOSITORY_INFO, CMISAction.GET_TYPE_DEFINITION, CMISAction.GET_CHILDREN, CMISAction.GET_OBJECT_PARENTS, CMISAction.GET_OBJECT, CMISAction.GET_CONTENT_STREAM, CMISAction.GET_ACL, CMISAction.QUERY, CMISAction.GET_RELATIONSHIPS, CMISAction.GET_CHANGE_LOG, CMISAction.CREATE_FOLDER, CMISAction.CREATE_DOCUMENT, CMISAction.UPDATE_PROPERTIES, CMISAction.DELETE_OBJECT, CMISAction.DELETE_TREE, CMISAction.SET_CONTENT_STREAM, } ) MUTATION_ACTIONS = { CMISAction.CREATE_DOCUMENT, CMISAction.CREATE_FOLDER, CMISAction.UPDATE_PROPERTIES, CMISAction.DELETE_OBJECT, CMISAction.DELETE_TREE, CMISAction.SET_CONTENT_STREAM, CMISAction.APPLY_ACL, CMISAction.APPLY_POLICY, CMISAction.BULK_UPDATE_PROPERTIES, } NEW_TYPE_SETTABLE_ATTRIBUTES: dict[str, bool] = { "id": False, "local_name": False, "local_namespace": False, "display_name": False, "query_name": False, "description": False, "creatable": False, "fileable": False, "queryable": False, "fulltext_indexed": False, "included_in_supertype_query": False, "controllable_policy": False, "controllable_acl": False, } UNSUPPORTED_FEATURES: dict[str, dict[str, Any]] = { "atompub": {"status": "unsupported", "reason": "binding_not_supported", "intent": "deferred"}, "web_services": {"status": "unsupported", "reason": "binding_not_supported", "intent": "deferred"}, "get_descendants": { "status": "unsupported", "reason": "capability_not_supported", "standard_flag": "capability_get_descendants", }, "get_folder_tree": { "status": "unsupported", "reason": "capability_not_supported", "standard_flag": "capability_get_folder_tree", }, "multifiling": { "status": "projection_only", "reason": "mutation_capability_not_supported", "standard_flag": "capability_multifiling", }, "unfiling": { "status": "unsupported", "reason": "capability_not_supported", "standard_flag": "capability_unfiling", }, "versioning_services": { "status": "unsupported", "reason": "capability_not_supported", "standard_flag": "document_type.versionable", }, "private_working_copy": { "status": "unsupported", "reason": "capability_not_supported", "standard_flags": ["capability_pwc_searchable", "capability_pwc_updatable"], }, "all_versions_search": { "status": "unsupported", "reason": "capability_not_supported", "standard_flag": "capability_all_versions_searchable", }, "full_cmis_sql_joins": { "status": "unsupported", "reason": "query_not_supported", "standard_flag": "capability_join", }, "order_by": {"status": "unsupported", "reason": "query_not_supported", "standard_flag": "capability_order_by"}, "append_content_stream": { "status": "unsupported", "reason": "capability_not_supported", "standard_flag": "capability_content_stream_updatability", }, "apply_acl": {"status": "unsupported", "reason": "operation_not_implemented", "standard_flag": "capability_acl"}, "apply_policy": {"status": "unsupported", "reason": "capability_not_supported"}, "remove_policy": {"status": "unsupported", "reason": "capability_not_supported"}, "retention_hold_mutation": {"status": "unsupported", "reason": "capability_not_supported"}, "bulk_update_properties": { "status": "unsupported", "reason": "operation_not_implemented", "standard_service": "bulkUpdateProperties", }, "rendition_streams": { "status": "unsupported", "reason": "capability_not_supported", "standard_flag": "capability_renditions", }, "type_mutability": { "status": "unsupported", "reason": "capability_not_supported", "standard_flag": "capability_new_type_settable_attributes", }, } @dataclass(frozen=True) class CMISAccessProfile: name: str binding: CMISBinding | str = CMISBinding.BROWSER capabilities: tuple[CMISCapability | str, ...] = () allow_mutations: bool = False visible_sensitivities: tuple[Sensitivity | str, ...] = (Sensitivity.PUBLIC, Sensitivity.INTERNAL) denied_sensitivities: tuple[Sensitivity | str, ...] = (Sensitivity.CONFIDENTIAL, Sensitivity.RESTRICTED) visible_asset_types: tuple[str, ...] = () visible_topics: tuple[str, ...] = () visible_source_systems: tuple[str, ...] = () denied_metadata: dict[str, tuple[Any, ...]] = field(default_factory=dict) required_actor_types: tuple[ActorType | str, ...] = () metadata: dict[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: object.__setattr__(self, "binding", CMISBinding(self.binding)) object.__setattr__( self, "capabilities", tuple(CMISCapability(capability) for capability in self.capabilities), ) object.__setattr__( self, "visible_sensitivities", tuple(Sensitivity(value) for value in self.visible_sensitivities), ) object.__setattr__( self, "denied_sensitivities", tuple(Sensitivity(value) for value in self.denied_sensitivities), ) object.__setattr__( self, "required_actor_types", tuple(ActorType(value) for value in self.required_actor_types), ) object.__setattr__( self, "denied_metadata", {key: tuple(values) for key, values in self.denied_metadata.items()}, ) @classmethod def readonly_browser(cls) -> "CMISAccessProfile": return cls( name="readonly-browser", capabilities=_read_capabilities(), allow_mutations=False, ) @classmethod def governed_authoring(cls) -> "CMISAccessProfile": return cls( name="governed-authoring", capabilities=_read_capabilities() + ( CMISCapability.OBJECT_WRITE, CMISCapability.CONTENT_STREAM_WRITE, ), allow_mutations=True, ) @classmethod def admin_export(cls) -> "CMISAccessProfile": return cls( name="admin-export", capabilities=_read_capabilities(), allow_mutations=False, visible_sensitivities=( Sensitivity.PUBLIC, Sensitivity.INTERNAL, Sensitivity.CONFIDENTIAL, Sensitivity.RESTRICTED, ), denied_sensitivities=(), required_actor_types=(ActorType.SERVICE_ACCOUNT,), ) @classmethod def compat_tck(cls) -> "CMISAccessProfile": return cls( name="compat-tck", capabilities=_read_capabilities() + ( CMISCapability.OBJECT_WRITE, CMISCapability.CONTENT_STREAM_WRITE, ), allow_mutations=True, metadata={"compatibility": "selected-opencmis-tck-browser-subset"}, ) def has_capability(self, capability: CMISCapability | str) -> bool: return CMISCapability(capability) in self.capabilities def allows_action(self, action: CMISAction | str) -> bool: cmis_action = CMISAction(action) return self.action_denial(cmis_action) is None def action_denial(self, action: CMISAction | str) -> tuple[str, dict[str, Any]] | None: cmis_action = CMISAction(action) capability = ACTION_CAPABILITIES[cmis_action] if cmis_action in MUTATION_ACTIONS and not self.allow_mutations: return ( "cmis_mutation_not_allowed", {"profile": self.name, "capability": capability.value, "mutation": cmis_action.value}, ) if not self.has_capability(capability): return ( "cmis_capability_not_supported", {"profile": self.name, "capability": capability.value}, ) if cmis_action not in IMPLEMENTED_CMIS_ACTIONS: return ( "cmis_operation_not_implemented", {"profile": self.name, "capability": capability.value, "operation": cmis_action.value}, ) return None def decide_action( self, action: CMISAction | str, context: OperationContext, *, resource: str = "cmis:repository", ) -> PolicyDecision: cmis_action = CMISAction(action) if not self.allows_actor(context): return PolicyDecision.deny( context.actor.id, cmis_action.value, resource, reason="actor_type_not_allowed_for_cmis_profile", context={"profile": self.name}, ) if self.allows_action(cmis_action): return PolicyDecision.allow( context.actor.id, cmis_action.value, resource, context={"profile": self.name}, ) reason, denial_context = self.action_denial(cmis_action) or ( "cmis_capability_not_supported", {"profile": self.name}, ) return PolicyDecision.deny( context.actor.id, cmis_action.value, resource, reason=reason, context=denial_context, ) def allows_actor(self, context: OperationContext) -> bool: return not self.required_actor_types or context.actor.actor_type in self.required_actor_types def exposes_asset(self, asset: KnowledgeAsset, context: OperationContext) -> bool: return self.decide_asset_visibility(asset, context).allowed def decide_asset_visibility( self, asset: KnowledgeAsset, context: OperationContext, ) -> PolicyDecision: resource = f"asset:{asset.id}" if not self.allows_actor(context): return PolicyDecision.deny( context.actor.id, "cmis.expose_asset", resource, reason="actor_type_not_allowed_for_cmis_profile", context={"profile": self.name}, ) classification = asset.classification if asset.lifecycle == LifecycleState.DELETE_REQUESTED: return PolicyDecision.deny( context.actor.id, "cmis.expose_asset", resource, reason="cmis_lifecycle_not_visible", context={"profile": self.name, "lifecycle": _enum_value(asset.lifecycle)}, ) if classification.sensitivity in self.denied_sensitivities: return PolicyDecision.deny( context.actor.id, "cmis.expose_asset", resource, reason="cmis_sensitivity_denied", context={"profile": self.name, "sensitivity": _enum_value(classification.sensitivity)}, ) if classification.sensitivity not in self.visible_sensitivities: return PolicyDecision.deny( context.actor.id, "cmis.expose_asset", resource, reason="cmis_sensitivity_not_visible", context={"profile": self.name, "sensitivity": _enum_value(classification.sensitivity)}, ) if self.visible_asset_types and classification.asset_type not in self.visible_asset_types: return PolicyDecision.deny( context.actor.id, "cmis.expose_asset", resource, reason="cmis_asset_type_not_visible", context={"profile": self.name, "asset_type": classification.asset_type}, ) if self.visible_topics and not set(classification.topics).intersection(self.visible_topics): return PolicyDecision.deny( context.actor.id, "cmis.expose_asset", resource, reason="cmis_topic_not_visible", context={"profile": self.name, "topics": list(classification.topics)}, ) source_system = asset.metadata.get("source_system") or classification.metadata.get("source_system") if self.visible_source_systems and source_system not in self.visible_source_systems: return PolicyDecision.deny( context.actor.id, "cmis.expose_asset", resource, reason="cmis_source_system_not_visible", context={"profile": self.name, "source_system": source_system}, ) for key, denied_values in self.denied_metadata.items(): value = asset.metadata.get(key, classification.metadata.get(key)) if value in denied_values: return PolicyDecision.deny( context.actor.id, "cmis.expose_asset", resource, reason="cmis_metadata_denied", context={"profile": self.name, "metadata_key": key}, ) return PolicyDecision.allow( context.actor.id, "cmis.expose_asset", resource, context={"profile": self.name}, ) def to_dict(self) -> dict[str, Any]: return compact_dict( { "name": self.name, "binding": self.binding.value, "capabilities": [capability.value for capability in self.capabilities], "allow_mutations": self.allow_mutations, "visible_sensitivities": [item.value for item in self.visible_sensitivities], "denied_sensitivities": [item.value for item in self.denied_sensitivities], "visible_asset_types": list(self.visible_asset_types), "visible_topics": list(self.visible_topics), "visible_source_systems": list(self.visible_source_systems), "denied_metadata": {key: list(values) for key, values in self.denied_metadata.items()}, "required_actor_types": [item.value for item in self.required_actor_types], "metadata": dict(self.metadata), } ) @classmethod def from_dict(cls, data: dict[str, Any]) -> "CMISAccessProfile": return cls( name=data["name"], binding=data.get("binding", CMISBinding.BROWSER.value), capabilities=tuple(data.get("capabilities", [])), allow_mutations=bool(data.get("allow_mutations", False)), visible_sensitivities=tuple(data.get("visible_sensitivities", [])), denied_sensitivities=tuple(data.get("denied_sensitivities", [])), visible_asset_types=tuple(data.get("visible_asset_types", [])), visible_topics=tuple(data.get("visible_topics", [])), visible_source_systems=tuple(data.get("visible_source_systems", [])), denied_metadata=dict(data.get("denied_metadata", {})), required_actor_types=tuple(data.get("required_actor_types", [])), metadata=dict(data.get("metadata", {})), ) @dataclass(frozen=True) class CMISAccessPoint: access_point_id: str repository_id: str profile: CMISAccessProfile base_path: str root_folder_id: str = "cmis-root" enabled: bool = True metadata: dict[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: normalized = "/" + self.base_path.strip("/") object.__setattr__(self, "base_path", normalized) def decide_action( self, action: CMISAction | str, context: OperationContext, *, resource: str | None = None, ) -> PolicyDecision: if not self.enabled: return PolicyDecision.deny( context.actor.id, CMISAction(action).value, resource or f"cmis:{self.repository_id}", reason="cmis_access_point_disabled", context={"access_point_id": self.access_point_id, "profile": self.profile.name}, ) return self.profile.decide_action( action, context, resource=resource or f"cmis:{self.repository_id}", ) def exposes_asset(self, asset: KnowledgeAsset, context: OperationContext) -> bool: return self.enabled and self.profile.exposes_asset(asset, context) def to_dict(self) -> dict[str, Any]: return compact_dict( { "access_point_id": self.access_point_id, "repository_id": self.repository_id, "profile": self.profile.to_dict(), "base_path": self.base_path, "root_folder_id": self.root_folder_id, "enabled": self.enabled, "metadata": dict(self.metadata), } ) @classmethod def from_dict(cls, data: dict[str, Any]) -> "CMISAccessPoint": return cls( access_point_id=data["access_point_id"], repository_id=data["repository_id"], profile=CMISAccessProfile.from_dict(data["profile"]), base_path=data["base_path"], root_folder_id=data.get("root_folder_id", "cmis-root"), enabled=bool(data.get("enabled", True)), metadata=dict(data.get("metadata", {})), ) @dataclass(frozen=True) class CMISObjectProjection: object_id: str base_type_id: CMISBaseType type_id: str name: str properties: dict[str, Any] allowable_actions: tuple[CMISAction, ...] = () path: str | None = None content_stream: dict[str, Any] | None = None version: dict[str, Any] | None = None relationships: tuple[str, ...] = () acl: dict[str, Any] | None = None def to_dict(self) -> dict[str, Any]: return compact_dict( { "object_id": self.object_id, "base_type_id": self.base_type_id.value, "type_id": self.type_id, "name": self.name, "path": self.path, "properties": dict(self.properties), "allowable_actions": [action.value for action in self.allowable_actions], "content_stream": dict(self.content_stream or {}), "version": dict(self.version or {}), "relationships": list(self.relationships), "acl": dict(self.acl or {}), } ) class CMISDomainMapper: """Project engine domain objects into CMIS-shaped envelopes.""" def __init__(self, access_point: CMISAccessPoint) -> None: self.access_point = access_point def repository_info(self) -> dict[str, Any]: profile = self.access_point.profile return { "repository_id": self.access_point.repository_id, "repository_name": self.access_point.metadata.get("repository_name", self.access_point.repository_id), "repository_description": self.access_point.metadata.get( "repository_description", "Kontextual Engine CMIS Browser Binding access point", ), "cmis_version_supported": "1.1", "root_folder_id": self.access_point.root_folder_id, "principal_anonymous": "anonymous", "principal_anyone": "anyone", "vendor_name": "Kontextual", "product_name": "kontextual-engine", "product_version": self.access_point.metadata.get("product_version", "0.1.0"), "binding": profile.binding.value, "capabilities": self.capability_flags(), "repository_features": self.repository_features(), "unsupported_features": self.unsupported_features(), "compliance": { "standard": "CMIS 1.1", "binding": profile.binding.value, "posture": "declared-browser-binding-subset", "overclaim_policy": "unsupported_optional_capabilities_are_advertised_as false/none", }, "profile": profile.name, } def capability_flags(self) -> dict[str, Any]: profile = self.access_point.profile return { "capability_content_stream_updatability": ( "anytime" if profile.allow_mutations and profile.has_capability(CMISCapability.CONTENT_STREAM_WRITE) else "none" ), "capability_changes": "objectidsonly" if profile.has_capability(CMISCapability.CHANGE_LOG) else "none", "capability_renditions": "none", "capability_get_descendants": False, "capability_get_folder_tree": False, "capability_order_by": "none", "capability_multifiling": False, "capability_unfiling": False, "capability_version_specific_filing": False, "capability_pwc_searchable": False, "capability_pwc_updatable": False, "capability_all_versions_searchable": False, "capability_query": "metadataonly" if profile.has_capability(CMISCapability.DISCOVERY_QUERY) else "none", "capability_join": "none", "capability_acl": "discover" if profile.has_capability(CMISCapability.ACL) else "none", "capability_new_type_settable_attributes": dict(NEW_TYPE_SETTABLE_ATTRIBUTES), } def repository_features(self) -> list[dict[str, Any]]: return [ { "id": "urn:kontextual:cmis:feature:profiled-access-points", "common_name": "Profiled CMIS access points", "version_label": "1.0", "description": "Multiple Browser Binding access points can expose different governed profile slices.", "url": "https://docs.oasis-open.org/cmis/CMIS/v1.1/CMIS-v1.1.html", }, { "id": "urn:kontextual:cmis:feature:projection-parentage", "common_name": "Projection-only parent folder maps", "version_label": "1.0", "description": ( "Assets may appear under multiple virtual folder projections; CMIS multi-filing mutation " "services are not advertised." ), "url": "https://docs.oasis-open.org/cmis/CMIS/v1.1/CMIS-v1.1.html", }, ] def unsupported_features(self) -> dict[str, dict[str, Any]]: return {key: dict(value) for key, value in UNSUPPORTED_FEATURES.items()} def type_definitions(self) -> list[dict[str, Any]]: can_write = self.access_point.profile.allow_mutations return [ _type_definition(CMISBaseType.DOCUMENT, "kontextual:document", "Kontextual Document", can_write), _type_definition(CMISBaseType.FOLDER, "kontextual:folder", "Kontextual Folder", can_write), _type_definition( CMISBaseType.RELATIONSHIP, "kontextual:relationship", "Kontextual Relationship", can_write, ), _type_definition(CMISBaseType.POLICY, "kontextual:policy", "Kontextual Policy", False), _type_definition(CMISBaseType.ITEM, "kontextual:item", "Kontextual Item", False), _type_definition(CMISBaseType.SECONDARY, "kontextual:secondary", "Kontextual Secondary", False), ] def map_asset( self, asset: KnowledgeAsset, context: OperationContext, *, representations: list[AssetRepresentation] | tuple[AssetRepresentation, ...] = (), versions: list[AssetVersion] | tuple[AssetVersion, ...] = (), relationship_ids: list[str] | tuple[str, ...] = (), metadata_records: list[Any] | tuple[Any, ...] = (), ) -> CMISObjectProjection | None: if not self.access_point.exposes_asset(asset, context): return None current_version = _current_version(asset, versions) content_stream = self.map_content_stream(asset, representations) return CMISObjectProjection( object_id=self.asset_object_id(asset.id), base_type_id=CMISBaseType.DOCUMENT, type_id=f"kontextual:{asset.classification.asset_type}", name=asset.title, path=self.asset_path(asset), properties=self.asset_properties(asset, metadata_records=metadata_records, content_stream=content_stream), allowable_actions=self.allowable_actions(context, has_content_stream=content_stream is not None), content_stream=content_stream, version=self.version_properties(asset, current_version, versions), relationships=tuple(relationship_ids), acl=self.acl_for_asset(asset, context), ) def map_relationship( self, relationship: CoreRelationship, context: OperationContext, ) -> CMISObjectProjection | None: decision = self.access_point.decide_action( CMISAction.GET_RELATIONSHIPS, context, resource=f"relationship:{relationship.relationship_id}", ) if not decision.allowed: return None source_id = self.asset_object_id(relationship.source_id) target_id = ( self.asset_object_id(relationship.target_id) if relationship.target_kind == RelationshipTargetKind.ASSET else f"cmis:entity:{relationship.target_id}" ) return CMISObjectProjection( object_id=f"cmis:relationship:{relationship.relationship_id}", base_type_id=CMISBaseType.RELATIONSHIP, type_id="kontextual:relationship", name=relationship.predicate, properties={ "cmis:objectId": f"cmis:relationship:{relationship.relationship_id}", "cmis:name": relationship.predicate, "cmis:baseTypeId": CMISBaseType.RELATIONSHIP.value, "cmis:objectTypeId": "kontextual:relationship", "cmis:sourceId": source_id, "cmis:targetId": target_id, "kontextual:predicate": relationship.predicate, "kontextual:confidence": relationship.confidence, "kontextual:targetKind": relationship.target_kind.value, }, allowable_actions=(CMISAction.GET_OBJECT, CMISAction.GET_RELATIONSHIPS), ) def acl_for_asset(self, asset: KnowledgeAsset, context: OperationContext) -> dict[str, Any] | None: visibility = self.access_point.profile.decide_asset_visibility(asset, context) if not visibility.allowed: return None permissions = ["cmis:read"] if self.access_point.profile.allow_mutations: permissions.extend(["cmis:write", "cmis:delete"]) entries = [ { "principal_id": context.actor.id, "permissions": permissions, "direct": True, } ] if asset.classification.sensitivity == Sensitivity.PUBLIC: entries.append( { "principal_id": "anyone", "permissions": ["cmis:read"], "direct": False, } ) return { "object_id": self.asset_object_id(asset.id), "is_exact": True, "aces": entries, "derived_from": "kontextual-profile-policy", "profile": self.access_point.profile.name, } def asset_object_id(self, asset_id: str) -> str: return f"cmis:asset:{asset_id}" def folder_object_id(self, path: str) -> str: return "cmis:folder:" + _normalize_path(path).strip("/").replace("/", "::") def asset_path(self, asset: KnowledgeAsset) -> str: return self.asset_paths(asset)[0] def asset_paths(self, asset: KnowledgeAsset) -> tuple[str, ...]: paths: list[str] = [] explicit = asset.metadata.get("cmis_path") if explicit: paths.append(_normalize_path(str(explicit))) for value in asset.metadata.get("cmis_paths", ()): paths.append(_normalize_path(str(value))) if asset.source_refs: source_ref = asset.source_refs[0] source_root = _safe_path_segment(source_ref.source_system) if source_ref.path: paths.append(_normalize_path(f"/sources/{source_root}/{source_ref.path}")) if source_ref.external_id: paths.append(_normalize_path(f"/sources/{source_root}/{source_ref.external_id}")) for topic in asset.classification.topics: paths.append(_normalize_path(f"/topics/{topic}/{asset.id}")) if asset.classification.owner: paths.append(_normalize_path(f"/owners/{asset.classification.owner}/{asset.id}")) paths.append(_normalize_path(f"/lifecycle/{_enum_value(asset.lifecycle)}/{asset.id}")) paths.append(_normalize_path(f"/assets/{asset.classification.asset_type}/{asset.id}")) return tuple(dict.fromkeys(paths)) def parent_folders_for_asset(self, asset: KnowledgeAsset) -> tuple[dict[str, Any], ...]: folders = [] for path in self.asset_paths(asset): parent = _parent_path(path) folders.append( { "object_id": self.folder_object_id(parent), "path": parent, "name": _path_name(parent), "base_type_id": CMISBaseType.FOLDER.value, "type_id": "kontextual:folder", "filing_source": _filing_source(parent), } ) return tuple({folder["path"]: folder for folder in folders}.values()) def folder_projection(self, path: str) -> dict[str, Any]: normalized = _normalize_path(path) parent = _parent_path(normalized) parent_id = self.access_point.root_folder_id if parent == "/" else self.folder_object_id(parent) return { "object_id": self.folder_object_id(normalized), "base_type_id": CMISBaseType.FOLDER.value, "type_id": "kontextual:folder", "name": _path_name(normalized), "path": normalized, "properties": { "cmis:objectId": self.folder_object_id(normalized), "cmis:name": _path_name(normalized), "cmis:baseTypeId": CMISBaseType.FOLDER.value, "cmis:objectTypeId": "kontextual:folder", "cmis:createdBy": "system", "cmis:lastModifiedBy": "system", "cmis:creationDate": "1970-01-01T00:00:00+00:00", "cmis:lastModificationDate": "1970-01-01T00:00:00+00:00", "cmis:changeToken": f"folder:{normalized}", "cmis:description": "Virtual CMIS folder projection", "cmis:secondaryObjectTypeIds": [], "cmis:parentId": parent_id, "cmis:path": normalized, "cmis:allowedChildObjectTypeIds": [CMISBaseType.DOCUMENT.value, CMISBaseType.FOLDER.value], "kontextual:sensitivity": "internal", "kontextual:lifecycle": LifecycleState.ACTIVE.value, "kontextual:filingSource": _filing_source(normalized), }, "allowable_actions": [CMISAction.GET_CHILDREN.value], } def asset_properties( self, asset: KnowledgeAsset, *, metadata_records: list[Any] | tuple[Any, ...] = (), content_stream: dict[str, Any] | None = None, ) -> dict[str, Any]: classification = asset.classification properties = { "cmis:objectId": self.asset_object_id(asset.id), "cmis:name": asset.title, "cmis:baseTypeId": CMISBaseType.DOCUMENT.value, "cmis:objectTypeId": f"kontextual:{classification.asset_type}", "cmis:createdBy": asset.metadata.get("created_by"), "cmis:lastModifiedBy": asset.metadata.get("updated_by"), "cmis:creationDate": asset.created_at, "cmis:lastModificationDate": asset.updated_at, "cmis:changeToken": asset.current_version_id, "cmis:description": asset.metadata.get("description", asset.title), "cmis:secondaryObjectTypeIds": list(asset.metadata.get("cmis_secondary_object_type_ids", ())), "cmis:path": self.asset_path(asset), "cmis:contentStreamLength": content_stream.get("length") if content_stream else None, "cmis:contentStreamMimeType": content_stream.get("mime_type") if content_stream else None, "cmis:contentStreamFileName": content_stream.get("file_name") if content_stream else None, "cmis:contentStreamId": content_stream.get("stream_id") if content_stream else None, "kontextual:assetId": asset.id, "kontextual:assetType": classification.asset_type, "kontextual:sensitivity": _enum_value(classification.sensitivity), "kontextual:lifecycle": _enum_value(asset.lifecycle), "kontextual:owner": classification.owner, "kontextual:topics": list(classification.topics), "kontextual:reviewState": classification.review_state, } for record in metadata_records: key = getattr(record, "key", None) if key: properties[f"kontextual:metadata:{key}"] = getattr(record, "value", None) compacted = compact_dict(properties) compacted.setdefault("cmis:createdBy", "system") compacted.setdefault("cmis:lastModifiedBy", "system") compacted.setdefault("cmis:secondaryObjectTypeIds", []) compacted.setdefault("kontextual:owner", "") compacted.setdefault("kontextual:topics", []) compacted.setdefault("kontextual:reviewState", "") return compacted def map_content_stream( self, asset: KnowledgeAsset, representations: list[AssetRepresentation] | tuple[AssetRepresentation, ...], ) -> dict[str, Any] | None: representation = _preferred_representation(representations) if representation is None: return None return compact_dict( { "stream_id": representation.representation_id, "file_name": asset.metadata.get("file_name", asset.title), "mime_type": representation.media_type, "length": representation.size_bytes, "digest": representation.digest, "storage_ref": representation.storage_ref, "kind": representation.kind.value, } ) def version_properties( self, asset: KnowledgeAsset, current_version: AssetVersion | None, versions: list[AssetVersion] | tuple[AssetVersion, ...], ) -> dict[str, Any]: if current_version is None: return { "cmis:isLatestVersion": True, "cmis:isMajorVersion": True, "cmis:isLatestMajorVersion": True, "cmis:versionSeriesId": f"cmis:version-series:{asset.id}", "cmis:versionLabel": "1", "cmis:isVersionSeriesCheckedOut": False, "cmis:isPrivateWorkingCopy": False, "cmis:versionSeriesCheckedOutBy": None, "cmis:versionSeriesCheckedOutId": None, "cmis:checkinComment": "", "cmis:isImmutable": False, } latest_sequence = max((version.sequence for version in versions), default=current_version.sequence) return { "cmis:isLatestVersion": current_version.sequence == latest_sequence, "cmis:isMajorVersion": True, "cmis:isLatestMajorVersion": current_version.sequence == latest_sequence, "cmis:versionSeriesId": f"cmis:version-series:{asset.id}", "cmis:versionLabel": str(current_version.sequence), "cmis:isVersionSeriesCheckedOut": False, "cmis:isPrivateWorkingCopy": False, "cmis:versionSeriesCheckedOutBy": None, "cmis:versionSeriesCheckedOutId": None, "cmis:checkinComment": "", "cmis:isImmutable": False, "kontextual:versionId": current_version.version_id, "kontextual:versionChangeType": current_version.change_type.value, } def allowable_actions( self, context: OperationContext, *, has_content_stream: bool, ) -> tuple[CMISAction, ...]: candidates = [ CMISAction.GET_OBJECT, CMISAction.GET_CONTENT_STREAM, CMISAction.GET_ACL, CMISAction.GET_RELATIONSHIPS, CMISAction.GET_OBJECT_PARENTS, CMISAction.UPDATE_PROPERTIES, CMISAction.DELETE_OBJECT, CMISAction.SET_CONTENT_STREAM, ] actions: list[CMISAction] = [] for action in candidates: if action == CMISAction.GET_CONTENT_STREAM and not has_content_stream: continue if self.access_point.decide_action(action, context).allowed: actions.append(action) return tuple(actions) def cmis_browser_service_document( repository_info: dict[str, Any], *, repository_url: str, root_folder_url: str, ) -> dict[str, Any]: """Serialize native repository info as a CMIS Browser Binding service root.""" browser_info = cmis_browser_repository_info( repository_info, repository_url=repository_url, root_folder_url=root_folder_url, ) return {browser_info["repositoryId"]: browser_info} def cmis_browser_repository_info( repository_info: dict[str, Any], *, repository_url: str, root_folder_url: str, ) -> dict[str, Any]: capabilities = repository_info.get("capabilities", {}) return compact_dict( { "repositoryId": repository_info.get("repository_id"), "repositoryName": repository_info.get("repository_name"), "repositoryDescription": repository_info.get("repository_description"), "vendorName": repository_info.get("vendor_name"), "productName": repository_info.get("product_name"), "productVersion": repository_info.get("product_version"), "rootFolderId": repository_info.get("root_folder_id"), "repositoryUrl": repository_url, "rootFolderUrl": root_folder_url, "capabilities": cmis_browser_capabilities(capabilities), "aclCapabilities": _browser_acl_capabilities(), "latestChangeLogToken": "0", "cmisVersionSupported": repository_info.get("cmis_version_supported"), "thinClientURI": repository_url, "changesIncomplete": True, "changesOnType": ["cmis:document", "cmis:folder"], "principalIdAnonymous": repository_info.get("principal_anonymous", "anonymous"), "principalIdAnyone": repository_info.get("principal_anyone", "anyone"), "extendedFeatures": cmis_browser_extended_features(repository_info), } ) def cmis_browser_capabilities(capabilities: dict[str, Any]) -> dict[str, Any]: settable = capabilities.get("capability_new_type_settable_attributes", {}) return { "capabilityContentStreamUpdatability": capabilities.get( "capability_content_stream_updatability", "none", ), "capabilityChanges": capabilities.get("capability_changes", "none"), "capabilityRenditions": capabilities.get("capability_renditions", "none"), "capabilityGetDescendants": bool(capabilities.get("capability_get_descendants", False)), "capabilityGetFolderTree": bool(capabilities.get("capability_get_folder_tree", False)), "capabilityMultifiling": bool(capabilities.get("capability_multifiling", False)), "capabilityUnfiling": bool(capabilities.get("capability_unfiling", False)), "capabilityVersionSpecificFiling": bool(capabilities.get("capability_version_specific_filing", False)), "capabilityPWCSearchable": bool(capabilities.get("capability_pwc_searchable", False)), "capabilityPWCUpdatable": bool(capabilities.get("capability_pwc_updatable", False)), "capabilityAllVersionsSearchable": bool(capabilities.get("capability_all_versions_searchable", False)), "capabilityOrderBy": capabilities.get("capability_order_by", "none"), "capabilityQuery": capabilities.get("capability_query", "none"), "capabilityJoin": capabilities.get("capability_join", "none"), "capabilityACL": capabilities.get("capability_acl", "none"), "capabilityCreatablePropertyTypes": {"canCreate": []}, "capabilityNewTypeSettableAttributes": { "id": bool(settable.get("id", False)), "localName": bool(settable.get("local_name", False)), "localNamespace": bool(settable.get("local_namespace", False)), "displayName": bool(settable.get("display_name", False)), "queryName": bool(settable.get("query_name", False)), "description": bool(settable.get("description", False)), "creatable": bool(settable.get("creatable", False)), "fileable": bool(settable.get("fileable", False)), "queryable": bool(settable.get("queryable", False)), "fulltextIndexed": bool(settable.get("fulltext_indexed", False)), "includedInSupertypeQuery": bool(settable.get("included_in_supertype_query", False)), "controllablePolicy": bool(settable.get("controllable_policy", False)), "controllableACL": bool(settable.get("controllable_acl", False)), }, } def cmis_browser_extended_features(repository_info: dict[str, Any]) -> list[dict[str, Any]]: features = [] for feature in repository_info.get("repository_features", []): if not isinstance(feature, dict): continue features.append( compact_dict( { "id": feature.get("id"), "commonName": feature.get("common_name"), "versionLabel": feature.get("version_label"), "description": feature.get("description"), "url": feature.get("url", ""), } ) ) return features def cmis_browser_type_definition( type_definition: dict[str, Any], *, include_property_definitions: bool = True, ) -> dict[str, Any]: base_id = type_definition.get("base_type_id") type_id = _browser_type_id(type_definition) is_document = base_id == CMISBaseType.DOCUMENT.value is_folder = base_id == CMISBaseType.FOLDER.value property_definitions = dict(type_definition.get("property_definitions", {})) property_definitions.update(_browser_standard_property_definitions(str(base_id))) result = { "id": type_id, "localName": type_id.split(":", 1)[-1], "localNamespace": "http://docs.oasis-open.org/ns/cmis/core/200908/", "displayName": _browser_type_display_name(type_definition), "queryName": type_id, "description": type_definition.get("display_name", type_id), "baseId": base_id, "parentId": None, "creatable": bool(type_definition.get("creatable", False)), "fileable": is_document or is_folder or bool(type_definition.get("fileable", False)), "queryable": bool(type_definition.get("queryable", is_document)), "fulltextIndexed": bool(type_definition.get("fulltext_indexed", False)), "includedInSupertypeQuery": bool(type_definition.get("included_in_supertype_query", is_document)), "controllablePolicy": bool(type_definition.get("controllable_policy", False)), "controllableACL": bool(type_definition.get("controllable_acl", is_document)), "typeMutability": {"create": False, "update": False, "delete": False}, } if include_property_definitions: result["propertyDefinitions"] = { key: cmis_browser_property_definition(key, value) for key, value in property_definitions.items() } if base_id == CMISBaseType.DOCUMENT.value: result["versionable"] = bool(type_definition.get("versionable", False)) result["contentStreamAllowed"] = "allowed" if base_id == CMISBaseType.RELATIONSHIP.value: result["allowedSourceTypes"] = [CMISBaseType.DOCUMENT.value] result["allowedTargetTypes"] = [CMISBaseType.DOCUMENT.value] return compact_dict(result) def cmis_browser_type_children( type_definitions: list[dict[str, Any]], *, type_id: str | None = None, skip_count: int = 0, max_items: int = 100, include_property_definitions: bool = False, ) -> dict[str, Any]: definitions = _browser_type_definitions( type_definitions, include_property_definitions=include_property_definitions, ) if type_id: definitions = [ definition for definition in definitions if definition.get("parentId") == type_id ] start = max(skip_count, 0) limit = max(max_items, 0) paged = definitions[start : start + limit] return { "types": paged, "hasMoreItems": len(definitions) > start + len(paged), "numItems": len(paged), } def cmis_browser_type_descendants( type_definitions: list[dict[str, Any]], *, type_id: str | None = None, include_property_definitions: bool = False, ) -> list[dict[str, Any]]: definitions = _browser_type_definitions( type_definitions, include_property_definitions=include_property_definitions, ) if type_id: definitions = [ definition for definition in definitions if definition.get("parentId") == type_id ] return [{"type": definition, "children": []} for definition in definitions] def cmis_browser_type_definition_by_id( type_definitions: list[dict[str, Any]], type_id: str | None, *, include_property_definitions: bool = True, ) -> dict[str, Any]: definitions = _browser_type_definitions( type_definitions, include_property_definitions=include_property_definitions, ) if type_id is None: return definitions[0] for definition in definitions: if definition["id"] == type_id: return definition for definition in definitions: if definition.get("baseId") == type_id: return definition raise KeyError(type_id) def cmis_browser_object(projection: dict[str, Any]) -> dict[str, Any]: properties = _browser_object_properties(projection) result = { "properties": { key: cmis_browser_property_value(key, value) for key, value in properties.items() }, "succinctProperties": properties, "allowableActions": cmis_browser_allowable_actions( projection.get("allowable_actions", []), base_type_id=properties.get("cmis:baseTypeId"), ), } acl = projection.get("acl") if isinstance(acl, dict) and acl: result["acl"] = cmis_browser_acl(acl) result["exactACL"] = bool(acl.get("is_exact", True)) return compact_dict(result) def cmis_browser_object_in_folder_list(children: dict[str, Any]) -> dict[str, Any]: objects = [] for item in children.get("objects", []): objects.append( { "object": cmis_browser_object(item), "pathSegment": item.get("name") or item.get("object_id"), } ) return { "objects": objects, "hasMoreItems": bool(children.get("has_more_items", False)), "numItems": int(children.get("num_items", len(objects))), } def cmis_browser_object_list(objects: list[dict[str, Any]], *, has_more_items: bool = False) -> dict[str, Any]: return { "objects": [cmis_browser_object(item) for item in objects], "hasMoreItems": has_more_items, "numItems": len(objects), } def cmis_browser_parent_list(parents: dict[str, Any]) -> list[dict[str, Any]]: items = [] for parent in parents.get("parents", []): items.append( { "object": cmis_browser_object(parent), "relativePathSegment": parent.get("name"), } ) return items def cmis_browser_query_result(query_result: dict[str, Any]) -> dict[str, Any]: results = [{"succinctProperties": _browser_object_properties(item)} for item in query_result.get("results", [])] return { "results": results, "hasMoreItems": bool(query_result.get("has_more_items", False)), "numItems": int(query_result.get("num_items", len(results))), } def cmis_browser_acl(acl: dict[str, Any]) -> dict[str, Any]: aces = [] for ace in acl.get("aces", []): aces.append( { "principal": {"principalId": ace.get("principal_id")}, "permissions": list(ace.get("permissions", [])), "isDirect": bool(ace.get("direct", True)), } ) return {"aces": aces, "isExact": bool(acl.get("is_exact", True))} def cmis_browser_property_definition( property_id: str, definition: dict[str, Any], ) -> dict[str, Any]: local_name = property_id.split(":", 1)[-1] return { "id": property_id, "localName": local_name, "displayName": local_name, "queryName": property_id, "description": property_id, "propertyType": definition.get("property_type", "string"), "cardinality": definition.get("cardinality", "single"), "updatability": definition.get( "updatability", "readwrite" if not property_id.startswith("cmis:") else "readonly", ), "inherited": bool(definition.get("inherited", False)), "required": bool(definition.get("required", False)), "queryable": bool(definition.get("queryable", True)), "orderable": bool(definition.get("orderable", False)), "openChoice": bool(definition.get("open_choice", True)), } def cmis_browser_property_value(property_id: str, value: Any) -> dict[str, Any]: local_name = property_id.split(":", 1)[-1] return { "id": property_id, "localName": local_name, "displayName": local_name, "queryName": property_id, "type": _browser_property_type(property_id, value), "cardinality": "multi" if isinstance(value, list) else "single", "value": value, } def cmis_browser_allowable_actions( actions: list[str] | tuple[str, ...], *, base_type_id: str | None = None, ) -> dict[str, bool]: native = set(actions) is_folder = base_type_id == CMISBaseType.FOLDER.value return { "canGetObjectParents": CMISAction.GET_OBJECT_PARENTS.value in native, "canGetProperties": CMISAction.GET_OBJECT.value in native, "canGetObjectRelationships": CMISAction.GET_RELATIONSHIPS.value in native, "canGetContentStream": CMISAction.GET_CONTENT_STREAM.value in native, "canGetACL": CMISAction.GET_ACL.value in native, "canUpdateProperties": CMISAction.UPDATE_PROPERTIES.value in native, "canDeleteObject": CMISAction.DELETE_OBJECT.value in native, "canSetContentStream": CMISAction.SET_CONTENT_STREAM.value in native, "canGetChildren": CMISAction.GET_CHILDREN.value in native, "canCreateDocument": CMISAction.CREATE_DOCUMENT.value in native, "canCreateFolder": CMISAction.CREATE_FOLDER.value in native, "canCreateRelationship": False, "canCreateItem": False, "canDeleteTree": CMISAction.DELETE_TREE.value in native, "canGetDescendants": False, "canGetFolderTree": False, "canGetFolderParent": is_folder and CMISAction.GET_OBJECT_PARENTS.value in native, "canGetRenditions": False, "canMoveObject": False, "canAddObjectToFolder": False, "canRemoveObjectFromFolder": False, "canCheckOut": False, "canCancelCheckOut": False, "canCheckIn": False, "canGetAllVersions": False, "canApplyPolicy": False, "canRemovePolicy": False, "canGetAppliedPolicies": False, "canApplyACL": False, "canDeleteContentStream": False, } def cmis_browser_root_folder(access_point: CMISAccessPoint) -> dict[str, Any]: allowable_actions = [CMISAction.GET_OBJECT.value, CMISAction.GET_CHILDREN.value] if access_point.profile.allow_mutations: allowable_actions.extend([CMISAction.CREATE_DOCUMENT.value, CMISAction.CREATE_FOLDER.value]) return { "object_id": access_point.root_folder_id, "base_type_id": CMISBaseType.FOLDER.value, "type_id": CMISBaseType.FOLDER.value, "name": "root", "path": "/", "properties": { "cmis:objectId": access_point.root_folder_id, "cmis:name": "root", "cmis:baseTypeId": CMISBaseType.FOLDER.value, "cmis:objectTypeId": CMISBaseType.FOLDER.value, "cmis:path": "/", "cmis:createdBy": "system", "cmis:lastModifiedBy": "system", "cmis:creationDate": "1970-01-01T00:00:00+00:00", "cmis:lastModificationDate": "1970-01-01T00:00:00+00:00", "cmis:changeToken": "root", "cmis:description": "CMIS root folder", "cmis:secondaryObjectTypeIds": [], "cmis:parentId": None, "cmis:allowedChildObjectTypeIds": [CMISBaseType.DOCUMENT.value, CMISBaseType.FOLDER.value], "kontextual:sensitivity": "internal", "kontextual:lifecycle": LifecycleState.ACTIVE.value, "kontextual:filingSource": "root", "kontextual:workspaceFolder": False, }, "allowable_actions": allowable_actions, } def _browser_acl_capabilities() -> dict[str, Any]: return { "supportedPermissions": "basic", "propagation": "objectonly", "permissions": [ {"permission": "cmis:read", "description": "Read"}, {"permission": "cmis:write", "description": "Write"}, {"permission": "cmis:all", "description": "All"}, ], "permissionMapping": [ {"key": "canGetProperties.Object", "permission": ["cmis:read"]}, {"key": "canViewContent.Object", "permission": ["cmis:read"]}, {"key": "canGetChildren.Folder", "permission": ["cmis:read"]}, {"key": "canGetParents.Folder", "permission": ["cmis:read"]}, {"key": "canUpdateProperties.Object", "permission": ["cmis:write"]}, {"key": "canDelete.Object", "permission": ["cmis:write"]}, {"key": "canSetContent.Document", "permission": ["cmis:write"]}, ], } def _browser_type_definitions( type_definitions: list[dict[str, Any]], *, include_property_definitions: bool = True, ) -> list[dict[str, Any]]: by_base: dict[str, dict[str, Any]] = {} for definition in type_definitions: base_id = definition.get("base_type_id") if isinstance(base_id, str) and base_id not in by_base: by_base[base_id] = cmis_browser_type_definition( definition, include_property_definitions=include_property_definitions, ) order = [ CMISBaseType.DOCUMENT.value, CMISBaseType.FOLDER.value, CMISBaseType.RELATIONSHIP.value, CMISBaseType.POLICY.value, CMISBaseType.ITEM.value, CMISBaseType.SECONDARY.value, ] return [by_base[item] for item in order if item in by_base] def _browser_type_id(type_definition: dict[str, Any]) -> str: base_id = type_definition.get("base_type_id") if isinstance(base_id, str) and base_id.startswith("cmis:"): return base_id return str(type_definition.get("id")) def _browser_type_display_name(type_definition: dict[str, Any]) -> str: base_id = type_definition.get("base_type_id") if base_id == CMISBaseType.DOCUMENT.value: return "Document" if base_id == CMISBaseType.FOLDER.value: return "Folder" if base_id == CMISBaseType.RELATIONSHIP.value: return "Relationship" if base_id == CMISBaseType.POLICY.value: return "Policy" if base_id == CMISBaseType.ITEM.value: return "Item" if base_id == CMISBaseType.SECONDARY.value: return "Secondary" return str(type_definition.get("display_name", type_definition.get("id", "CMIS Type"))) def _browser_standard_property_definitions(base_id: str) -> dict[str, dict[str, Any]]: common = { "cmis:objectId": _browser_propdef("id", required=False, updatability="readonly"), "cmis:name": _browser_propdef("string", required=True, updatability="readwrite"), "cmis:baseTypeId": _browser_propdef("id", required=False, updatability="readonly"), "cmis:objectTypeId": _browser_propdef("id", required=True, updatability="oncreate"), "cmis:createdBy": _browser_propdef( "string", required=False, updatability="readonly", orderable=True, ), "cmis:creationDate": _browser_propdef( "datetime", required=False, updatability="readonly", orderable=True, ), "cmis:lastModifiedBy": _browser_propdef( "string", required=False, updatability="readonly", orderable=True, ), "cmis:lastModificationDate": _browser_propdef( "datetime", required=False, updatability="readonly", orderable=True, ), "cmis:changeToken": _browser_propdef("string", required=False, updatability="readonly"), "cmis:description": _browser_propdef("string", required=False, updatability="readwrite"), "cmis:secondaryObjectTypeIds": _browser_propdef( "id", cardinality="multi", required=False, updatability="readwrite", ), } if base_id == CMISBaseType.FOLDER.value: common.update( { "cmis:parentId": _browser_propdef("id", required=False, updatability="readonly"), "cmis:path": _browser_propdef("string", required=False, updatability="readonly"), "cmis:allowedChildObjectTypeIds": _browser_propdef( "id", cardinality="multi", required=False, updatability="readonly", ), } ) if base_id == CMISBaseType.DOCUMENT.value: common.update( { "cmis:isImmutable": _browser_propdef("boolean", required=False, updatability="readonly"), "cmis:isLatestVersion": _browser_propdef("boolean", required=False, updatability="readonly"), "cmis:isMajorVersion": _browser_propdef("boolean", required=False, updatability="readonly"), "cmis:isLatestMajorVersion": _browser_propdef("boolean", required=False, updatability="readonly"), "cmis:versionLabel": _browser_propdef("string", required=False, updatability="readonly"), "cmis:versionSeriesId": _browser_propdef("id", required=False, updatability="readonly"), "cmis:isVersionSeriesCheckedOut": _browser_propdef( "boolean", required=False, updatability="readonly", ), "cmis:isPrivateWorkingCopy": _browser_propdef( "boolean", required=False, updatability="readonly", ), "cmis:versionSeriesCheckedOutBy": _browser_propdef( "string", required=False, updatability="readonly", ), "cmis:versionSeriesCheckedOutId": _browser_propdef("id", required=False, updatability="readonly"), "cmis:checkinComment": _browser_propdef("string", required=False, updatability="readonly"), "cmis:contentStreamLength": _browser_propdef("integer", required=False, updatability="readonly"), "cmis:contentStreamMimeType": _browser_propdef("string", required=False, updatability="readonly"), "cmis:contentStreamFileName": _browser_propdef("string", required=False, updatability="readonly"), "cmis:contentStreamId": _browser_propdef("id", required=False, updatability="readonly"), } ) if base_id == CMISBaseType.RELATIONSHIP.value: common.update( { "cmis:sourceId": _browser_propdef("id", required=True, updatability="readonly"), "cmis:targetId": _browser_propdef("id", required=True, updatability="readonly"), } ) if base_id == CMISBaseType.POLICY.value: common["cmis:policyText"] = _browser_propdef("string", required=False, updatability="readonly") return common def _browser_propdef( property_type: str, *, cardinality: str = "single", required: bool = False, updatability: str = "readonly", queryable: bool = True, orderable: bool = False, open_choice: bool = True, ) -> dict[str, Any]: return { "property_type": property_type, "cardinality": cardinality, "required": required, "updatability": updatability, "queryable": queryable, "orderable": orderable, "open_choice": open_choice, } def _browser_object_properties(projection: dict[str, Any]) -> dict[str, Any]: properties = dict(projection.get("properties", {})) properties.update(projection.get("version", {})) base_type = projection.get("base_type_id") or properties.get("cmis:baseTypeId") properties["cmis:objectId"] = projection.get("object_id", properties.get("cmis:objectId")) properties["cmis:name"] = projection.get("name", properties.get("cmis:name")) properties["cmis:baseTypeId"] = base_type if base_type in {CMISBaseType.DOCUMENT.value, CMISBaseType.FOLDER.value}: properties["cmis:objectTypeId"] = base_type else: properties["cmis:objectTypeId"] = projection.get("type_id", properties.get("cmis:objectTypeId")) return properties def _browser_property_type(property_id: str, value: Any) -> str: if property_id in { "cmis:objectId", "cmis:baseTypeId", "cmis:objectTypeId", "cmis:contentStreamId", "cmis:sourceId", "cmis:targetId", }: return "id" if property_id in {"cmis:creationDate", "cmis:lastModificationDate"}: return "datetime" if property_id == "cmis:contentStreamLength": return "integer" if isinstance(value, bool): return "boolean" if isinstance(value, int): return "integer" if isinstance(value, float): return "decimal" return "string" def _read_capabilities() -> tuple[CMISCapability, ...]: return ( CMISCapability.REPOSITORY, CMISCapability.TYPE_DEFINITIONS, CMISCapability.NAVIGATION, CMISCapability.OBJECT_READ, CMISCapability.CONTENT_STREAM_READ, CMISCapability.DISCOVERY_QUERY, CMISCapability.RELATIONSHIPS, CMISCapability.ACL, CMISCapability.CHANGE_LOG, ) def _enum_value(value: Any) -> Any: return getattr(value, "value", value) def _type_definition( base_type_id: CMISBaseType, type_id: str, display_name: str, can_write: bool, ) -> dict[str, Any]: is_document = base_type_id == CMISBaseType.DOCUMENT return { "id": type_id, "local_name": type_id.split(":", 1)[-1], "display_name": display_name, "base_type_id": base_type_id.value, "queryable": is_document, "controllable_acl": is_document, "controllable_policy": False, "creatable": can_write and base_type_id in {CMISBaseType.DOCUMENT, CMISBaseType.FOLDER}, "fileable": base_type_id in {CMISBaseType.DOCUMENT, CMISBaseType.FOLDER}, "fulltext_indexed": False, "included_in_supertype_query": is_document, "versionable": False, "property_definitions": _property_definitions(base_type_id), } def _property_definitions(base_type_id: CMISBaseType) -> dict[str, dict[str, Any]]: definitions = { "cmis:objectId": {"property_type": "id", "cardinality": "single", "required": True}, "cmis:name": {"property_type": "string", "cardinality": "single", "required": True}, "cmis:baseTypeId": {"property_type": "id", "cardinality": "single", "required": True}, "cmis:objectTypeId": {"property_type": "id", "cardinality": "single", "required": True}, "cmis:createdBy": {"property_type": "string", "cardinality": "single", "required": False}, "cmis:lastModifiedBy": {"property_type": "string", "cardinality": "single", "required": False}, "cmis:creationDate": {"property_type": "datetime", "cardinality": "single", "required": False}, "cmis:lastModificationDate": {"property_type": "datetime", "cardinality": "single", "required": False}, "cmis:changeToken": {"property_type": "string", "cardinality": "single", "required": False}, "cmis:secondaryObjectTypeIds": {"property_type": "id", "cardinality": "multi", "required": False}, "cmis:path": {"property_type": "string", "cardinality": "single", "required": False}, "cmis:description": {"property_type": "string", "cardinality": "single", "required": False}, "kontextual:sensitivity": {"property_type": "string", "cardinality": "single", "required": False}, "kontextual:lifecycle": {"property_type": "string", "cardinality": "single", "required": False}, } if base_type_id == CMISBaseType.DOCUMENT: definitions.update( { "cmis:contentStreamLength": { "property_type": "integer", "cardinality": "single", "required": False, }, "cmis:contentStreamMimeType": { "property_type": "string", "cardinality": "single", "required": False, }, "cmis:contentStreamFileName": { "property_type": "string", "cardinality": "single", "required": False, }, "cmis:contentStreamId": { "property_type": "id", "cardinality": "single", "required": False, }, "kontextual:assetId": { "property_type": "id", "cardinality": "single", "required": False, }, "kontextual:assetType": { "property_type": "string", "cardinality": "single", "required": False, }, "kontextual:owner": { "property_type": "string", "cardinality": "single", "required": False, }, "kontextual:topics": { "property_type": "string", "cardinality": "multi", "required": False, }, "kontextual:reviewState": { "property_type": "string", "cardinality": "single", "required": False, }, "cmis:isImmutable": { "property_type": "boolean", "cardinality": "single", "required": False, }, "cmis:isLatestVersion": { "property_type": "boolean", "cardinality": "single", "required": False, }, "cmis:isMajorVersion": { "property_type": "boolean", "cardinality": "single", "required": False, }, "cmis:isLatestMajorVersion": { "property_type": "boolean", "cardinality": "single", "required": False, }, "cmis:versionLabel": { "property_type": "string", "cardinality": "single", "required": False, }, "cmis:versionSeriesId": { "property_type": "id", "cardinality": "single", "required": False, }, "cmis:isVersionSeriesCheckedOut": { "property_type": "boolean", "cardinality": "single", "required": False, }, "cmis:isPrivateWorkingCopy": { "property_type": "boolean", "cardinality": "single", "required": False, }, "cmis:versionSeriesCheckedOutBy": { "property_type": "string", "cardinality": "single", "required": False, }, "cmis:versionSeriesCheckedOutId": { "property_type": "id", "cardinality": "single", "required": False, }, "cmis:checkinComment": { "property_type": "string", "cardinality": "single", "required": False, }, "kontextual:versionId": { "property_type": "id", "cardinality": "single", "required": False, }, "kontextual:versionChangeType": { "property_type": "string", "cardinality": "single", "required": False, }, } ) if base_type_id == CMISBaseType.FOLDER: definitions["kontextual:filingSource"] = { "property_type": "string", "cardinality": "single", "required": False, } definitions["kontextual:workspaceFolder"] = { "property_type": "boolean", "cardinality": "single", "required": False, } if base_type_id == CMISBaseType.RELATIONSHIP: definitions["cmis:sourceId"] = {"property_type": "id", "cardinality": "single", "required": True} definitions["cmis:targetId"] = {"property_type": "id", "cardinality": "single", "required": True} return definitions def _current_version( asset: KnowledgeAsset, versions: list[AssetVersion] | tuple[AssetVersion, ...], ) -> AssetVersion | None: if asset.current_version_id: for version in versions: if version.version_id == asset.current_version_id: return version if versions: return sorted(versions, key=lambda version: version.sequence)[-1] return None def _preferred_representation( representations: list[AssetRepresentation] | tuple[AssetRepresentation, ...], ) -> AssetRepresentation | None: if not representations: return None priority = { RepresentationKind.SOURCE: 0, RepresentationKind.NORMALIZED: 1, RepresentationKind.DERIVED: 2, } best_priority = min(priority.get(item.kind, 99) for item in representations) candidates = [item for item in representations if priority.get(item.kind, 99) == best_priority] return sorted(candidates, key=lambda item: (item.created_at, item.representation_id), reverse=True)[0] def _normalize_path(path: str) -> str: parts = [_safe_path_segment(part) for part in path.replace("\\", "/").split("/") if part] return "/" + "/".join(parts) def _safe_path_segment(value: str) -> str: return str(value).strip().strip("/") or "_" def _parent_path(path: str) -> str: parts = _normalize_path(path).strip("/").split("/") if len(parts) <= 1: return "/" return "/" + "/".join(parts[:-1]) def _path_name(path: str) -> str: normalized = _normalize_path(path) if normalized == "/": return "root" return normalized.rsplit("/", 1)[-1] def _filing_source(path: str) -> str: parts = _normalize_path(path).strip("/").split("/") return parts[0] if parts and parts[0] else "root"