CMIS authoring operations

This commit is contained in:
2026-05-07 01:46:44 +02:00
parent 7e168e93d3
commit e02f78d7e3
6 changed files with 282 additions and 1 deletions

View File

@@ -85,6 +85,26 @@ object reads, content stream descriptors, a constrained document query subset,
relationship objects, and audit-backed change entries. Unsupported query
grammar returns structured diagnostics.
## Governed Authoring Slice
The Browser Binding adapter now exposes selected mutation routes for profiles
that allow authoring:
- `POST /cmis/{access_point_id}/browser/document`
- `POST /cmis/{access_point_id}/browser/object/{object_id}/properties`
- `POST /cmis/{access_point_id}/browser/object/{object_id}/content`
- `POST /cmis/{access_point_id}/browser/object/{object_id}/delete`
These routes delegate to existing engine services:
- document creation uses `AssetRegistryService.create_asset`,
- property updates add governed metadata records,
- content stream updates add asset representations and content-change versions,
- delete requests transition the asset lifecycle to `delete_requested`.
Read-only profiles reject the same mutations with CMIS-shaped authorization
diagnostics before touching engine services.
Route-level tests are present but skip when the optional FastAPI/httpx service
dependencies are not installed. Runtime-level Browser Binding tests cover the
same behavior in the default Python test suite.

View File

@@ -16,6 +16,7 @@ from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository
from kontextual_engine.core import (
Actor,
ActorType,
AssetRepresentation,
AuditEvent,
AuditOutcome,
Classification,
@@ -33,6 +34,7 @@ from kontextual_engine.core import (
PolicyDecision,
PolicyEffect,
RelationshipTargetKind,
RepresentationKind,
RetrievalFeedbackLabel,
SourceReference,
TransformationRunStatus,
@@ -419,6 +421,127 @@ class ServiceRuntime:
)
return content_stream
def cmis_create_document(
self,
access_point_id: str,
payload: dict[str, Any],
context: OperationContext,
) -> dict[str, Any]:
mapper = self._cmis_mapper(access_point_id)
decision = mapper.access_point.decide_action(CMISAction.CREATE_DOCUMENT, context)
if not decision.allowed:
raise _cmis_authorization_error(decision, "createDocument")
classification = Classification.from_dict(
{
"asset_type": payload.get("asset_type", "document"),
"sensitivity": payload.get("sensitivity", "internal"),
"owner": payload.get("owner"),
"topics": payload.get("topics", []),
"metadata": dict(payload.get("classification_metadata", {})),
}
)
content = payload.get("content")
representations = []
if content is not None:
representations.append(
AssetRepresentation.from_content(
payload.get("asset_id") or "cmis-new-document",
RepresentationKind.SOURCE,
payload.get("media_type", "text/plain"),
content,
storage_ref=payload.get("storage_ref"),
)
)
result = self.asset_service().create_asset(
payload["name"],
classification,
context,
asset_id=payload.get("asset_id"),
representations=representations,
metadata_records=[_metadata_record(item) for item in payload.get("metadata_records", [])],
idempotency_key=payload.get("idempotency_key"),
)
return self.cmis_object(access_point_id, mapper.asset_object_id(result.asset.id), context)
def cmis_update_properties(
self,
access_point_id: str,
object_id: str,
payload: dict[str, Any],
context: OperationContext,
) -> dict[str, Any]:
mapper = self._cmis_mapper(access_point_id)
decision = mapper.access_point.decide_action(CMISAction.UPDATE_PROPERTIES, context, resource=object_id)
if not decision.allowed:
raise _cmis_authorization_error(decision, "updateProperties")
asset_id = _cmis_asset_id(object_id)
properties = dict(payload.get("properties", payload))
expected = properties.pop("expected_current_version_id", payload.get("expected_current_version_id", None))
for key, value in properties.items():
if key.startswith("cmis:"):
continue
self.asset_service().add_metadata_record(
asset_id,
MetadataRecord(key=_cmis_metadata_key(key), value=value, confirmed=bool(payload.get("confirmed", True))),
context,
expected_current_version_id=expected,
)
expected = None
return self.cmis_object(access_point_id, object_id, context)
def cmis_set_content_stream(
self,
access_point_id: str,
object_id: str,
payload: dict[str, Any],
context: OperationContext,
) -> dict[str, Any]:
mapper = self._cmis_mapper(access_point_id)
decision = mapper.access_point.decide_action(CMISAction.SET_CONTENT_STREAM, context, resource=object_id)
if not decision.allowed:
raise _cmis_authorization_error(decision, "setContentStream")
asset_id = _cmis_asset_id(object_id)
representation = AssetRepresentation.from_content(
asset_id,
payload.get("kind", RepresentationKind.SOURCE.value),
payload.get("media_type", "text/plain"),
payload.get("content", ""),
storage_ref=payload.get("storage_ref"),
)
self.asset_service().add_representation(
asset_id,
representation,
context,
expected_current_version_id=payload.get("expected_current_version_id"),
)
return self.cmis_object(access_point_id, object_id, context)
def cmis_delete_object(
self,
access_point_id: str,
object_id: str,
payload: dict[str, Any],
context: OperationContext,
) -> dict[str, Any]:
mapper = self._cmis_mapper(access_point_id)
decision = mapper.access_point.decide_action(CMISAction.DELETE_OBJECT, context, resource=object_id)
if not decision.allowed:
raise _cmis_authorization_error(decision, "deleteObject")
asset_id = _cmis_asset_id(object_id)
result = self.asset_service().request_delete(
asset_id,
context,
expected_current_version_id=payload.get("expected_current_version_id"),
)
return {
"object_id": mapper.asset_object_id(asset_id),
"deleted": False,
"lifecycle": result.asset.lifecycle.value,
"version": result.version.to_dict(),
"audit_event": result.audit_event.to_dict(),
"policy_decision": result.policy_decision.to_dict(),
}
def cmis_query(
self,
access_point_id: str,
@@ -1964,6 +2087,41 @@ def create_app(runtime: ServiceRuntime | None = None):
) -> dict[str, Any]:
return response(runtime.cmis_content_stream, access_point_id, object_id, context)
@app.post("/cmis/{access_point_id}/browser/document", tags=["cmis"])
def cmis_create_document(
access_point_id: str,
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.cmis_create_document, access_point_id, payload, context)
@app.post("/cmis/{access_point_id}/browser/object/{object_id:path}/properties", tags=["cmis"])
def cmis_update_properties(
access_point_id: str,
object_id: str,
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.cmis_update_properties, access_point_id, object_id, payload, context)
@app.post("/cmis/{access_point_id}/browser/object/{object_id:path}/content", tags=["cmis"])
def cmis_set_content_stream(
access_point_id: str,
object_id: str,
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.cmis_set_content_stream, access_point_id, object_id, payload, context)
@app.post("/cmis/{access_point_id}/browser/object/{object_id:path}/delete", tags=["cmis"])
def cmis_delete_object(
access_point_id: str,
object_id: str,
payload: dict[str, Any] | None = None,
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.cmis_delete_object, access_point_id, object_id, payload or {}, context)
@app.get("/cmis/{access_point_id}/browser/query", tags=["cmis"])
def cmis_query(
access_point_id: str,
@@ -2442,6 +2600,14 @@ def _cmis_change_type(operation: str) -> str:
return "security"
def _cmis_metadata_key(key: str) -> str:
if key.startswith("kontextual:metadata:"):
return key.removeprefix("kontextual:metadata:")
if key.startswith("kontextual:"):
return key.removeprefix("kontextual:")
return key
def _age_seconds(start: str, end: str) -> float:
try:
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))

View File

@@ -162,3 +162,41 @@ def test_cmis_query_reports_unsupported_subset_diagnostics(cmis_client) -> None:
"SELECT * FROM cmis:document",
"SELECT * FROM kontextual:document",
]
def test_cmis_governed_authoring_routes_allow_selected_mutations(cmis_client) -> None:
created = cmis_client.post(
"/cmis/governed-authoring/browser/document",
json={
"asset_id": "asset-api-authored",
"name": "API Authored",
"content": "# API Authored",
"media_type": "text/markdown",
},
)
updated = cmis_client.post(
"/cmis/governed-authoring/browser/object/cmis:asset:asset-api-authored/properties",
json={"properties": {"kontextual:metadata:status": "draft"}},
)
streamed = cmis_client.post(
"/cmis/governed-authoring/browser/object/cmis:asset:asset-api-authored/content",
json={"content": "# Updated", "media_type": "text/markdown"},
)
deleted = cmis_client.post(
"/cmis/governed-authoring/browser/object/cmis:asset:asset-api-authored/delete",
json={},
)
assert created.status_code == 200
assert updated.json()["properties"]["kontextual:metadata:status"] == "draft"
assert streamed.json()["content_stream"]["mime_type"] == "text/markdown"
assert deleted.json()["lifecycle"] == "delete_requested"
def test_cmis_readonly_route_rejects_mutation(cmis_client) -> None:
response = cmis_client.post(
"/cmis/readonly-browser/browser/document",
json={"asset_id": "asset-api-readonly-denied", "name": "Denied"},
)
assert response.status_code == 403

View File

@@ -118,3 +118,58 @@ def test_runtime_cmis_browser_rejects_unsupported_query_subset(cmis_runtime) ->
)
assert "Unsupported CMIS query subset" in str(exc_info.value)
def test_runtime_cmis_governed_authoring_allows_selected_mutations(cmis_runtime) -> None:
runtime, context = cmis_runtime
created = runtime.cmis_create_document(
"governed-authoring",
{
"asset_id": "asset-authored",
"name": "Authored Through CMIS",
"sensitivity": "internal",
"topics": ["cmis"],
"content": "# Authored\n\nCreated through CMIS.",
"media_type": "text/markdown",
"metadata_records": [{"key": "status", "value": "draft", "confirmed": True}],
},
context,
)
updated = runtime.cmis_update_properties(
"governed-authoring",
"cmis:asset:asset-authored",
{"properties": {"kontextual:metadata:reviewer": "codex"}},
context,
)
streamed = runtime.cmis_set_content_stream(
"governed-authoring",
"cmis:asset:asset-authored",
{"content": "# Authored\n\nUpdated stream.", "media_type": "text/markdown"},
context,
)
deleted = runtime.cmis_delete_object(
"governed-authoring",
"cmis:asset:asset-authored",
{},
context,
)
assert created["object_id"] == "cmis:asset:asset-authored"
assert updated["properties"]["kontextual:metadata:reviewer"] == "codex"
assert streamed["content_stream"]["mime_type"] == "text/markdown"
assert deleted["deleted"] is False
assert deleted["lifecycle"] == "delete_requested"
def test_runtime_cmis_readonly_profile_rejects_mutations(cmis_runtime) -> None:
runtime, context = cmis_runtime
with pytest.raises(Exception) as exc_info:
runtime.cmis_create_document(
"readonly-browser",
{"asset_id": "asset-readonly-denied", "name": "Denied"},
context,
)
assert "CMIS operation denied" in str(exc_info.value)

View File

@@ -657,6 +657,8 @@ def test_service_health_readiness_version_and_openapi_contracts(client) -> None:
assert "/cmis" in paths
assert "/cmis/{access_point_id}/browser" in paths
assert "/cmis/{access_point_id}/browser/children" in paths
assert "/cmis/{access_point_id}/browser/document" in paths
assert "/cmis/{access_point_id}/browser/object/{object_id}/properties" in paths
assert "/api/v1/assets" in paths
assert "/api/v1/relationships" in paths
assert "/api/v1/audit/events" in paths

View File

@@ -109,7 +109,7 @@ Acceptance:
```task
id: KONT-WP-0012-T004
status: todo
status: done
priority: high
state_hub_task_id: "49716ca7-6a10-43ac-8ac5-ffa1c15b048e"
```