projection-only multifiling

This commit is contained in:
2026-05-07 02:52:49 +02:00
parent 801d2f4851
commit 81e132b33b
9 changed files with 250 additions and 46 deletions

View File

@@ -346,26 +346,12 @@ class ServiceRuntime:
decision = mapper.access_point.decide_action(CMISAction.GET_CHILDREN, context)
if not decision.allowed:
raise _cmis_authorization_error(decision, "getChildren")
projections = [
projection.to_dict()
for asset in self.repository.list_assets()
if (
projection := mapper.map_asset(
asset,
context,
representations=self.repository.list_representations(asset_id=asset.id),
versions=self.repository.list_versions(asset.id),
relationship_ids=[
f"cmis:relationship:{relationship.relationship_id}"
for relationship in self.repository.list_relationships(source_id=asset.id)
],
metadata_records=self.repository.list_metadata_records(asset.id),
)
)
]
folder_path = _cmis_folder_path(folder_id)
projections = self._cmis_children_for_folder(mapper, context, folder_path=folder_path)
paged = projections[max(skip_count, 0) : max(skip_count, 0) + max(max_items, 0)]
return {
"folder_id": folder_id or mapper.access_point.root_folder_id,
"folder_path": folder_path or "/",
"objects": paged,
"num_items": len(paged),
"has_more_items": len(projections) > max(skip_count, 0) + len(paged),
@@ -441,6 +427,26 @@ class ServiceRuntime:
)
return acl
def cmis_object_parents(
self,
access_point_id: str,
object_id: str,
context: OperationContext,
) -> dict[str, Any]:
mapper = self._cmis_mapper(access_point_id)
decision = mapper.access_point.decide_action(CMISAction.GET_OBJECT_PARENTS, context, resource=object_id)
if not decision.allowed:
raise _cmis_authorization_error(decision, "getObjectParents")
asset_id = _cmis_asset_id(object_id)
asset = self.repository.get_asset(asset_id)
if not mapper.access_point.exposes_asset(asset, context):
raise NotFoundError(
"CMIS object not found",
details={"object_id": object_id, "access_point_id": access_point_id},
)
parents = list(mapper.parent_folders_for_asset(asset))
return {"object_id": mapper.asset_object_id(asset.id), "parents": parents, "count": len(parents)}
def cmis_create_document(
self,
access_point_id: str,
@@ -584,18 +590,14 @@ class ServiceRuntime:
"supported": ["SELECT * FROM cmis:document", "SELECT * FROM kontextual:document"],
},
)
children = self.cmis_children(
access_point_id,
context,
skip_count=skip_count,
max_items=max_items,
)
projections = self._cmis_document_projections(mapper, context)
paged = projections[max(skip_count, 0) : max(skip_count, 0) + max(max_items, 0)]
return {
"query": query,
"results": children["objects"],
"num_items": children["num_items"],
"has_more_items": children["has_more_items"],
"total_num_items": children["total_num_items"],
"results": paged,
"num_items": len(paged),
"has_more_items": len(projections) > max(skip_count, 0) + len(paged),
"total_num_items": len(projections),
}
def cmis_relationships(
@@ -685,6 +687,74 @@ class ServiceRuntime:
return self._cmis_asset_visible(mapper, relationship.target_id, context)
return True
def _cmis_children_for_folder(
self,
mapper: CMISDomainMapper,
context: OperationContext,
*,
folder_path: str | None,
) -> list[dict[str, Any]]:
assets = [
asset
for asset in self.repository.list_assets()
if mapper.access_point.exposes_asset(asset, context)
]
if folder_path in (None, "/"):
child_folder_paths = set()
for asset in assets:
for path in mapper.asset_paths(asset):
first = path.strip("/").split("/")[0]
if first:
child_folder_paths.add("/" + first)
return [mapper.folder_projection(path) for path in sorted(child_folder_paths)]
children: list[dict[str, Any]] = []
folder_path = _normalize_cmis_path(folder_path)
child_folder_paths: set[str] = set()
for asset in assets:
for path in mapper.asset_paths(asset):
parent = _path_parent(path)
if parent == folder_path:
projection = mapper.map_asset(
asset,
context,
representations=self.repository.list_representations(asset_id=asset.id),
versions=self.repository.list_versions(asset.id),
relationship_ids=[
f"cmis:relationship:{relationship.relationship_id}"
for relationship in self.repository.list_relationships(source_id=asset.id)
if self._cmis_relationship_visible(mapper, relationship, context)
],
metadata_records=self.repository.list_metadata_records(asset.id),
)
if projection is not None:
children.append(projection.to_dict())
elif _path_parent(parent) == folder_path:
child_folder_paths.add(parent)
return [mapper.folder_projection(path) for path in sorted(child_folder_paths)] + children
def _cmis_document_projections(
self,
mapper: CMISDomainMapper,
context: OperationContext,
) -> list[dict[str, Any]]:
projections = []
for asset in self.repository.list_assets():
projection = mapper.map_asset(
asset,
context,
representations=self.repository.list_representations(asset_id=asset.id),
versions=self.repository.list_versions(asset.id),
relationship_ids=[
f"cmis:relationship:{relationship.relationship_id}"
for relationship in self.repository.list_relationships(source_id=asset.id)
if self._cmis_relationship_visible(mapper, relationship, context)
],
metadata_records=self.repository.list_metadata_records(asset.id),
)
if projection is not None:
projections.append(projection.to_dict())
return projections
def create_asset(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]:
classification = Classification.from_dict(payload["classification"])
result = self.asset_service().create_asset(
@@ -2140,6 +2210,14 @@ def create_app(runtime: ServiceRuntime | None = None):
) -> dict[str, Any]:
return response(runtime.cmis_acl, access_point_id, object_id, context)
@app.get("/cmis/{access_point_id}/browser/parents/{object_id:path}", tags=["cmis"])
def cmis_object_parents(
access_point_id: str,
object_id: str,
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.cmis_object_parents, access_point_id, object_id, context)
@app.post("/cmis/{access_point_id}/browser/document", tags=["cmis"])
def cmis_create_document(
access_point_id: str,
@@ -2632,6 +2710,29 @@ def _cmis_asset_id(object_id: str | None) -> str:
return normalized
def _cmis_folder_path(folder_id: str | None) -> str | None:
if not folder_id:
return None
normalized = folder_id.strip()
if normalized in {"cmis-root", "root", "/"}:
return "/"
if normalized.startswith("cmis:folder:"):
return "/" + normalized.removeprefix("cmis:folder:").replace("::", "/")
return _normalize_cmis_path(normalized)
def _normalize_cmis_path(path: str) -> str:
parts = [part.strip().strip("/") for part in path.replace("\\", "/").split("/") if part.strip("/")]
return "/" + "/".join(parts)
def _path_parent(path: str) -> str:
parts = _normalize_cmis_path(path).strip("/").split("/")
if len(parts) <= 1:
return "/"
return "/" + "/".join(parts[:-1])
def _cmis_authorization_error(decision: PolicyDecision, operation: str) -> AuthorizationError:
return AuthorizationError(
"CMIS operation denied by access-point profile",

View File

@@ -48,6 +48,7 @@ class CMISAction(str, Enum):
GET_REPOSITORY_INFO = "get_repository_info"
GET_TYPE_DEFINITION = "get_type_definition"
GET_CHILDREN = "get_children"
GET_OBJECT_PARENTS = "get_object_parents"
GET_OBJECT = "get_object"
GET_CONTENT_STREAM = "get_content_stream"
GET_ACL = "get_acl"
@@ -76,6 +77,7 @@ ACTION_CAPABILITIES: dict[CMISAction, CMISCapability] = {
CMISAction.GET_REPOSITORY_INFO: CMISCapability.REPOSITORY,
CMISAction.GET_TYPE_DEFINITION: CMISCapability.TYPE_DEFINITIONS,
CMISAction.GET_CHILDREN: CMISCapability.NAVIGATION,
CMISAction.GET_OBJECT_PARENTS: CMISCapability.NAVIGATION,
CMISAction.GET_OBJECT: CMISCapability.OBJECT_READ,
CMISAction.GET_CONTENT_STREAM: CMISCapability.CONTENT_STREAM_READ,
CMISAction.GET_ACL: CMISCapability.ACL,
@@ -486,7 +488,7 @@ class CMISDomainMapper:
"capability_renditions": "read" if profile.has_capability(CMISCapability.RENDITIONS) else "none",
"capability_get_descendants": profile.has_capability(CMISCapability.NAVIGATION),
"capability_get_folder_tree": profile.has_capability(CMISCapability.NAVIGATION),
"capability_multifiling": False,
"capability_multifiling": profile.has_capability(CMISCapability.NAVIGATION),
"capability_unfiling": False,
"capability_version_specific_filing": False,
"capability_pwc_searchable": False,
@@ -613,21 +615,67 @@ class CMISDomainMapper:
def asset_object_id(self, asset_id: str) -> str:
return f"cmis:asset:{asset_id}"
def folder_object_id(self, path: str) -> str:
return "cmis:folder:" + _normalize_path(path).strip("/").replace("/", "::")
def asset_path(self, asset: KnowledgeAsset) -> str:
return self.asset_paths(asset)[0]
def asset_paths(self, asset: KnowledgeAsset) -> tuple[str, ...]:
paths: list[str] = []
explicit = asset.metadata.get("cmis_path")
if explicit:
return _normalize_path(str(explicit))
paths.append(_normalize_path(str(explicit)))
for value in asset.metadata.get("cmis_paths", ()):
paths.append(_normalize_path(str(value)))
if asset.source_refs:
source_ref = asset.source_refs[0]
source_root = _safe_path_segment(source_ref.source_system)
if source_ref.path:
return _normalize_path(f"/sources/{source_root}/{source_ref.path}")
paths.append(_normalize_path(f"/sources/{source_root}/{source_ref.path}"))
if source_ref.external_id:
return _normalize_path(f"/sources/{source_root}/{source_ref.external_id}")
topics = asset.classification.topics
if topics:
return _normalize_path(f"/topics/{topics[0]}/{asset.id}")
return _normalize_path(f"/assets/{asset.classification.asset_type}/{asset.id}")
paths.append(_normalize_path(f"/sources/{source_root}/{source_ref.external_id}"))
for topic in asset.classification.topics:
paths.append(_normalize_path(f"/topics/{topic}/{asset.id}"))
if asset.classification.owner:
paths.append(_normalize_path(f"/owners/{asset.classification.owner}/{asset.id}"))
paths.append(_normalize_path(f"/lifecycle/{_enum_value(asset.lifecycle)}/{asset.id}"))
paths.append(_normalize_path(f"/assets/{asset.classification.asset_type}/{asset.id}"))
return tuple(dict.fromkeys(paths))
def parent_folders_for_asset(self, asset: KnowledgeAsset) -> tuple[dict[str, Any], ...]:
folders = []
for path in self.asset_paths(asset):
parent = _parent_path(path)
folders.append(
{
"object_id": self.folder_object_id(parent),
"path": parent,
"name": _path_name(parent),
"base_type_id": CMISBaseType.FOLDER.value,
"type_id": "kontextual:folder",
"filing_source": _filing_source(parent),
}
)
return tuple({folder["path"]: folder for folder in folders}.values())
def folder_projection(self, path: str) -> dict[str, Any]:
normalized = _normalize_path(path)
return {
"object_id": self.folder_object_id(normalized),
"base_type_id": CMISBaseType.FOLDER.value,
"type_id": "kontextual:folder",
"name": _path_name(normalized),
"path": normalized,
"properties": {
"cmis:objectId": self.folder_object_id(normalized),
"cmis:name": _path_name(normalized),
"cmis:baseTypeId": CMISBaseType.FOLDER.value,
"cmis:objectTypeId": "kontextual:folder",
"kontextual:filingSource": _filing_source(normalized),
},
"allowable_actions": [CMISAction.GET_CHILDREN.value],
}
def asset_properties(
self,
@@ -716,6 +764,7 @@ class CMISDomainMapper:
CMISAction.GET_CONTENT_STREAM,
CMISAction.GET_ACL,
CMISAction.GET_RELATIONSHIPS,
CMISAction.GET_OBJECT_PARENTS,
CMISAction.UPDATE_PROPERTIES,
CMISAction.DELETE_OBJECT,
CMISAction.SET_CONTENT_STREAM,
@@ -819,3 +868,22 @@ def _normalize_path(path: str) -> str:
def _safe_path_segment(value: str) -> str:
return str(value).strip().strip("/") or "_"
def _parent_path(path: str) -> str:
parts = _normalize_path(path).strip("/").split("/")
if len(parts) <= 1:
return "/"
return "/" + "/".join(parts[:-1])
def _path_name(path: str) -> str:
normalized = _normalize_path(path)
if normalized == "/":
return "root"
return normalized.rsplit("/", 1)[-1]
def _filing_source(path: str) -> str:
parts = _normalize_path(path).strip("/").split("/")
return parts[0] if parts and parts[0] else "root"