More CMIS scoring optimization

This commit is contained in:
2026-05-13 23:42:56 +02:00
parent 852b45c158
commit b11c9189e4
7 changed files with 775 additions and 52 deletions

View File

@@ -20,6 +20,7 @@ from kontextual_engine.core import (
Actor,
ActorType,
AssetRepresentation,
AssetVersion,
AuditEvent,
AuditOutcome,
Classification,
@@ -42,6 +43,7 @@ from kontextual_engine.core import (
RetrievalFeedbackLabel,
SourceReference,
TransformationRunStatus,
VersionChangeType,
WorkflowExceptionKind,
WorkflowExceptionStatus,
WorkflowInputDefinition,
@@ -734,7 +736,17 @@ class ServiceRuntime:
"CMIS object not found",
details={"object_id": object_id, "access_point_id": access_point_id},
)
if asset.metadata.get("cmis_content_deleted") or not self._cmis_asset_representations(asset):
if asset.metadata.get("cmis_content_deleted"):
raise ValidationError(
"CMIS document has no content stream",
details={
"code": "cmis.no_content_stream",
"cmis_exception": "constraint",
"object_id": object_id,
"access_point_id": access_point_id,
},
)
if not self._cmis_asset_representations(asset):
digest = content_digest(b"")
representation = AssetRepresentation(
asset_id=asset_id,
@@ -984,6 +996,206 @@ class ServiceRuntime:
payload["folder_id"] = parent_folder_id
return cmis_browser_object(self.cmis_create_document(access_point_id, payload, context))
def cmis_create_document_from_source(
self,
access_point_id: str,
payload: dict[str, Any],
context: OperationContext,
) -> dict[str, Any]:
mapper = self._cmis_mapper(access_point_id)
decision = mapper.access_point.decide_action(CMISAction.CREATE_DOCUMENT, context)
if not decision.allowed:
raise _cmis_authorization_error(decision, "createDocumentFromSource")
properties = _cmis_browser_properties(payload)
type_id = properties.get("cmis:objectTypeId")
if type_id and type_id not in {CMISBaseType.DOCUMENT.value, "kontextual:document"}:
raise ValidationError(
"Invalid CMIS source-copy document type",
details={"operation": "createDocumentFromSource", "type_id": type_id},
)
source_object_id = payload.get("sourceId") or payload.get("source_id") or payload.get("sourceObjectId")
if not source_object_id:
raise ValidationError(
"CMIS source object id is required",
details={"operation": "createDocumentFromSource", "parameter": "sourceId"},
)
source_asset_id = _cmis_asset_id(str(source_object_id))
source_asset = self.repository.get_asset(source_asset_id)
if not mapper.access_point.exposes_asset(source_asset, context):
raise NotFoundError(
"CMIS source object not found",
details={"object_id": source_object_id, "access_point_id": access_point_id},
)
name = str(properties.get("cmis:name") or payload.get("name") or source_asset.title).strip()
if not name:
raise ValidationError("CMIS name cannot be empty", details={"operation": "createDocumentFromSource"})
asset_id = payload.get("asset_id") or new_id("asset")
representations = [
replace(
representation,
asset_id=asset_id,
representation_id=new_id("repr"),
producer="cmis-createDocumentFromSource",
metadata={
**representation.metadata,
"cmis_source_object_id": str(source_object_id),
"cmis_source_representation_id": representation.representation_id,
},
)
for representation in self._cmis_asset_representations(source_asset)
]
result = self.asset_service().create_asset(
name,
source_asset.classification,
context,
asset_id=asset_id,
source_refs=list(source_asset.source_refs),
representations=representations,
idempotency_key=payload.get("idempotency_key"),
)
metadata = {
key: value
for key, value in source_asset.metadata.items()
if key not in {"cmis_path", "cmis_paths", "cmis_parent_folder_id", "cmis_content_deleted"}
}
metadata.update(
{
"cmis_copied_from_object_id": str(source_object_id),
"cmis_copied_from_asset_id": source_asset.id,
"file_name": name,
}
)
if "cmis:description" in properties:
metadata["description"] = str(properties.get("cmis:description") or "")
if "cmis:secondaryObjectTypeIds" in properties:
metadata["cmis_secondary_object_type_ids"] = _cmis_value_list(properties.get("cmis:secondaryObjectTypeIds"))
folder_id = payload.get("folder_id") or payload.get("folderId")
if folder_id:
folder_path = _cmis_folder_path(folder_id) or "/"
metadata["cmis_path"] = _normalize_cmis_path(f"{folder_path}/{name}")
metadata["cmis_parent_folder_id"] = "cmis-root" if folder_path == "/" else mapper.folder_object_id(folder_path)
self.repository.save_asset(replace(result.asset, metadata=metadata))
return self.cmis_object(access_point_id, mapper.asset_object_id(result.asset.id), context)
def cmis_browser_create_document_from_source(
self,
access_point_id: str,
payload: dict[str, Any],
context: OperationContext,
*,
parent_folder_id: str | None = None,
) -> dict[str, Any]:
payload = dict(payload)
if parent_folder_id and "folder_id" not in payload and "folderId" not in payload:
payload["folder_id"] = parent_folder_id
return cmis_browser_object(self.cmis_create_document_from_source(access_point_id, payload, context))
def cmis_bulk_update_properties(
self,
access_point_id: str,
payload: dict[str, Any],
context: OperationContext,
) -> list[dict[str, Any]]:
mapper = self._cmis_mapper(access_point_id)
decision = mapper.access_point.decide_action(CMISAction.BULK_UPDATE_PROPERTIES, context)
if not decision.allowed:
raise _cmis_authorization_error(decision, "bulkUpdateProperties")
entries = _cmis_browser_bulk_entries(payload)
if not entries:
raise ValidationError(
"CMIS bulk update object ids are required",
details={"operation": "bulkUpdateProperties", "parameter": "objectId"},
)
properties = _cmis_browser_properties(payload)
if not properties:
raise ValidationError(
"CMIS bulk update properties are required",
details={"operation": "bulkUpdateProperties", "parameter": "propertyId"},
)
updated_entries: list[dict[str, Any]] = []
for entry in entries:
update_payload: dict[str, Any] = {"properties": dict(properties)}
if entry.get("change_token"):
update_payload["expected_current_version_id"] = entry["change_token"]
updated = self.cmis_update_properties(access_point_id, entry["object_id"], update_payload, context)
updated_properties = updated.get("properties", {})
object_id_property = updated_properties.get("cmis:objectId")
change_token_property = updated_properties.get("cmis:changeToken")
updated_entries.append(
{
"id": entry["object_id"],
"newId": object_id_property,
"changeToken": change_token_property,
}
)
return updated_entries
def _cmis_expected_change_token(self, payload: dict[str, Any], properties: dict[str, Any]) -> str | None:
return properties.pop(
"expected_current_version_id",
payload.get("expected_current_version_id") or payload.get("changeToken") or payload.get("change_token"),
)
def _cmis_assert_change_token(
self,
asset,
expected_current_version_id: str | None,
*,
operation: str,
) -> None:
if not expected_current_version_id:
return
if asset.current_version_id != expected_current_version_id:
raise ValidationError(
"CMIS change token conflict",
details={
"code": "asset.version_conflict",
"cmis_exception": "updateConflict",
"operation": operation,
"asset_id": asset.id,
"expected_current_version_id": expected_current_version_id,
"current_version_id": asset.current_version_id,
},
)
def _cmis_record_asset_version(
self,
asset,
context: OperationContext,
*,
change_type: VersionChangeType,
operation_id: str,
metadata_delta: dict[str, Any] | None = None,
representation_ids: tuple[str, ...] = (),
):
versions = self.repository.list_versions(asset.id)
next_sequence = max((version.sequence for version in versions), default=0) + 1
version = AssetVersion(
asset_id=asset.id,
sequence=next_sequence,
change_type=change_type,
representation_ids=representation_ids,
actor_id=context.actor.id,
operation_id=operation_id,
parent_version_id=asset.current_version_id,
metadata_delta=dict(metadata_delta or {}),
lifecycle=asset.lifecycle.value,
)
updated = asset.with_current_version(version.version_id)
self.repository.save_actor(context.actor)
self.repository.save_asset(updated)
self.repository.save_version(version)
self.repository.save_audit_event(
AuditEvent.from_context(
operation_id,
f"asset:{asset.id}",
AuditOutcome.SUCCESS,
context,
details={"version_id": version.version_id},
)
)
return updated
def cmis_update_properties(
self,
access_point_id: str,
@@ -996,7 +1208,7 @@ class ServiceRuntime:
if not decision.allowed:
raise _cmis_authorization_error(decision, "updateProperties")
properties = dict(payload.get("properties", payload))
expected = properties.pop("expected_current_version_id", payload.get("expected_current_version_id", None))
expected = self._cmis_expected_change_token(payload, properties)
if object_id.startswith("cmis:folder:"):
return self._cmis_update_workspace_folder(mapper, object_id, properties, context)
asset_id = _cmis_asset_id(object_id)
@@ -1038,6 +1250,17 @@ class ServiceRuntime:
expected = None
if asset_metadata_updates or title_update is not None:
asset = self.repository.get_asset(asset_id)
self._cmis_assert_change_token(asset, expected, operation="updateProperties")
metadata_delta = dict(asset_metadata_updates)
if title_update is not None:
metadata_delta["title"] = title_update
asset = self._cmis_record_asset_version(
asset,
context,
change_type=VersionChangeType.METADATA_CHANGED,
operation_id="cmis.updateProperties",
metadata_delta=metadata_delta,
)
metadata = {**asset.metadata, **asset_metadata_updates}
if title_update is not None and asset.metadata.get("cmis_path"):
current_path = _normalize_cmis_path(str(asset.metadata["cmis_path"]))
@@ -1061,6 +1284,11 @@ class ServiceRuntime:
if not decision.allowed:
raise _cmis_authorization_error(decision, "setContentStream")
asset_id = _cmis_asset_id(object_id)
expected = (
payload.get("expected_current_version_id")
or payload.get("changeToken")
or payload.get("change_token")
)
asset = self.repository.get_asset(asset_id)
if asset.metadata.get("cmis_content_deleted"):
metadata = dict(asset.metadata)
@@ -1072,7 +1300,7 @@ class ServiceRuntime:
_cmis_media_type(payload.get("media_type", "text/plain")),
payload.get("content", ""),
context,
expected_current_version_id=payload.get("expected_current_version_id"),
expected_current_version_id=expected,
metadata={"cmis": {"operation": "setContentStream"}},
)
return self.cmis_object(access_point_id, object_id, context)
@@ -1082,11 +1310,13 @@ class ServiceRuntime:
access_point_id: str,
object_id: str,
context: OperationContext,
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
mapper = self._cmis_mapper(access_point_id)
decision = mapper.access_point.decide_action(CMISAction.SET_CONTENT_STREAM, context, resource=object_id)
if not decision.allowed:
raise _cmis_authorization_error(decision, "deleteContentStream")
payload = payload or {}
asset_id = _cmis_asset_id(object_id)
asset = self.repository.get_asset(asset_id)
if not mapper.access_point.exposes_asset(asset, context):
@@ -1094,6 +1324,20 @@ class ServiceRuntime:
"CMIS object not found",
details={"object_id": object_id, "access_point_id": access_point_id},
)
expected = (
payload.get("expected_current_version_id")
or payload.get("changeToken")
or payload.get("change_token")
)
self._cmis_assert_change_token(asset, expected, operation="deleteContentStream")
asset = self._cmis_record_asset_version(
asset,
context,
change_type=VersionChangeType.CONTENT_CHANGED,
operation_id="cmis.deleteContentStream",
metadata_delta={"cmis_content_deleted": True},
representation_ids=tuple(representation.representation_id for representation in self._cmis_asset_representations(asset)),
)
self.repository.save_asset(replace(asset, metadata={**asset.metadata, "cmis_content_deleted": True}))
return self.cmis_object(access_point_id, object_id, context)
@@ -3067,22 +3311,86 @@ def create_app(runtime: ServiceRuntime | None = None):
)
app.state.kontextual_runtime = runtime
def _is_cmis_request(request: Request) -> bool:
return str(request.url.path).startswith("/cmis/")
def _cmis_error_response(status_code: int, payload: dict[str, Any]) -> JSONResponse:
details = dict(payload.get("details", {})) if isinstance(payload.get("details"), dict) else {}
message = str(payload.get("message") or payload.get("detail") or "CMIS request failed")
code = str(details.get("code") or payload.get("code") or "kontextual.cmis")
cmis_exception = details.get("cmis_exception")
resolved_status = status_code
if not cmis_exception:
lowered = message.lower()
if code == "kontextual.not_found" or status_code == 404:
cmis_exception = "objectNotFound"
resolved_status = 404
elif code == "kontextual.authorization" or status_code == 403:
cmis_exception = "permissionDenied"
resolved_status = 403
elif code == "asset.version_conflict":
cmis_exception = "updateConflict"
resolved_status = 409
elif code == "cmis.not_supported" or lowered.startswith("unsupported cmis browser binding action"):
cmis_exception = "notSupported"
resolved_status = 405
elif "path already exists" in lowered or "cannot be moved" in lowered:
cmis_exception = "constraint"
resolved_status = 409
else:
cmis_exception = "invalidArgument"
resolved_status = 400
elif cmis_exception == "objectNotFound":
resolved_status = 404
elif cmis_exception == "permissionDenied":
resolved_status = 403
elif cmis_exception == "updateConflict":
resolved_status = 409
elif cmis_exception == "notSupported":
resolved_status = 405
elif cmis_exception == "constraint":
resolved_status = 409
elif status_code == 422:
resolved_status = 400
content = {
"exception": cmis_exception,
"message": message,
"code": code,
"details": details,
}
return JSONResponse(status_code=resolved_status, content=content)
@app.exception_handler(NotFoundError)
async def not_found_error_handler(_request, exc: NotFoundError) -> JSONResponse:
if _is_cmis_request(_request):
return _cmis_error_response(404, _error_payload(exc))
return JSONResponse(status_code=404, content=_error_payload(exc))
@app.exception_handler(AuthorizationError)
async def authorization_error_handler(_request, exc: AuthorizationError) -> JSONResponse:
if _is_cmis_request(_request):
return _cmis_error_response(403, _authorization_error_payload(exc))
return JSONResponse(status_code=403, content=_authorization_error_payload(exc))
@app.exception_handler(ValidationError)
async def validation_error_handler(_request, exc: ValidationError) -> JSONResponse:
if _is_cmis_request(_request):
return _cmis_error_response(422, _error_payload(exc))
return JSONResponse(status_code=422, content=_error_payload(exc))
@app.exception_handler(KontextualError)
async def kontextual_error_handler(_request, exc: KontextualError) -> JSONResponse:
if _is_cmis_request(_request):
return _cmis_error_response(400, _error_payload(exc))
return JSONResponse(status_code=400, content=_error_payload(exc))
@app.exception_handler(HTTPException)
async def http_exception_handler(_request, exc: HTTPException) -> JSONResponse:
if _is_cmis_request(_request):
detail = exc.detail if isinstance(exc.detail, dict) else {"message": str(exc.detail)}
return _cmis_error_response(exc.status_code, detail)
return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail})
@app.get("/health", tags=["system"])
def health() -> dict[str, Any]:
return runtime.health()
@@ -3208,7 +3516,7 @@ def create_app(runtime: ServiceRuntime | None = None):
length = max(end - offset + 1, 0)
start = max(offset or 0, 0)
requested_length = None if length is None else max(length, 0)
is_partial = offset is not None or length is not None or bool(range_header)
is_partial = start > 0 or requested_length is not None
content_length = max(representation.size_bytes - start, 0)
if requested_length is not None:
content_length = min(content_length, requested_length)
@@ -3337,6 +3645,14 @@ def create_app(runtime: ServiceRuntime | None = None):
context,
parent_folder_id=object_id or payload.get("folderId") or payload.get("folder_id") or "cmis-root",
)
if action in {"createDocumentFromSource", "copy"}:
return response(
runtime.cmis_browser_create_document_from_source,
access_point_id,
payload,
context,
parent_folder_id=object_id or payload.get("folderId") or payload.get("folder_id") or "cmis-root",
)
if action in {"delete", "deleteObject"}:
if not object_id:
raise ValidationError("CMIS object id is required", details={"operation": "deleteObject"})
@@ -3363,8 +3679,10 @@ def create_app(runtime: ServiceRuntime | None = None):
if action in {"deleteContent", "deleteContentStream"}:
if not object_id:
raise ValidationError("CMIS object id is required", details={"operation": "deleteContentStream"})
response(runtime.cmis_delete_content_stream, access_point_id, object_id, context)
response(runtime.cmis_delete_content_stream, access_point_id, object_id, context, payload)
return response(runtime.cmis_browser_object, access_point_id, object_id, context)
if action in {"bulkUpdateProperties", "bulkUpdate"}:
return response(runtime.cmis_bulk_update_properties, access_point_id, payload, context)
raise ValidationError(
"Unsupported CMIS Browser Binding action",
details={
@@ -3372,6 +3690,7 @@ def create_app(runtime: ServiceRuntime | None = None):
"supported": [
"createFolder",
"createDocument",
"createDocumentFromSource",
"delete",
"deleteObject",
"deleteTree",
@@ -4408,6 +4727,28 @@ def _cmis_browser_properties(payload: dict[str, Any]) -> dict[str, Any]:
return properties
def _cmis_browser_bulk_entries(payload: dict[str, Any]) -> list[dict[str, str | None]]:
object_ids: dict[str, str] = {}
change_tokens: dict[str, str | None] = {}
for key, value in payload.items():
if key.startswith("objectId["):
index = key[len("objectId[") :].split("]", 1)[0]
object_ids[index] = str(value)
elif key.startswith("changeToken["):
index = key[len("changeToken[") :].split("]", 1)[0]
token = str(value)
change_tokens[index] = token or None
def sort_key(index: str) -> tuple[int, str]:
return (int(index), index) if index.isdigit() else (10_000, index)
return [
{"object_id": object_ids[index], "change_token": change_tokens.get(index)}
for index in sorted(object_ids, key=sort_key)
if object_ids[index]
]
def _cmis_authorization_error(decision: PolicyDecision, operation: str) -> AuthorizationError:
return AuthorizationError(
"CMIS operation denied by access-point profile",