from __future__ import annotations import pytest from kontextual_engine import ( AssetRepresentation, Classification, RepresentationKind, ServiceRuntime, Sensitivity, ) from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository pytestmark = pytest.mark.cmis @pytest.fixture def cmis_runtime() -> tuple[ServiceRuntime, object]: runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository()) context = runtime.operation_context(actor_id="cmis-runtime", correlation_id="corr-cmis-runtime") runtime.asset_service().create_asset( "Runtime Source", Classification( asset_type="document", sensitivity=Sensitivity.INTERNAL, owner="Platform Knowledge", topics=("cmis",), ), context, asset_id="asset-runtime-source", representations=[ AssetRepresentation.from_content( "asset-runtime-source", RepresentationKind.SOURCE, "text/markdown", "# Runtime Source\n\nCMIS runtime fixture.", storage_ref="memory://asset-runtime-source/source", ) ], ) runtime.create_asset( { "asset_id": "asset-runtime-public", "title": "Runtime Public", "classification": {"asset_type": "document", "sensitivity": "public"}, }, context, ) runtime.create_asset( { "asset_id": "asset-runtime-confidential", "title": "Runtime Confidential", "classification": {"asset_type": "document", "sensitivity": "confidential"}, }, context, ) runtime.create_relationship( { "source_asset_id": "asset-runtime-source", "target_id": "asset-runtime-public", "predicate": "references", "target_kind": "asset", "confidence": 0.99, }, context, ) return runtime, context def test_runtime_cmis_browser_repository_types_children_and_object(cmis_runtime) -> None: runtime, context = cmis_runtime access_points = runtime.cmis_access_points() repository = runtime.cmis_repository_info("readonly-browser") types = runtime.cmis_type_definitions("readonly-browser") children = runtime.cmis_children("readonly-browser", context) obj = runtime.cmis_object("readonly-browser", "cmis:asset:asset-runtime-source", context) assert access_points["count"] == 4 assert repository["repository_id"] == "kontextual-readonly-browser" assert repository["capabilities"]["capability_get_descendants"] is True assert {item["base_type_id"] for item in types["items"]} >= {"cmis:document", "cmis:folder"} object_ids = {item["object_id"] for item in children["objects"]} assert "cmis:asset:asset-runtime-source" in object_ids assert "cmis:asset:asset-runtime-public" in object_ids assert "cmis:asset:asset-runtime-confidential" not in object_ids assert obj["properties"]["kontextual:assetId"] == "asset-runtime-source" def test_runtime_cmis_browser_content_query_relationships_and_changes(cmis_runtime) -> None: runtime, context = cmis_runtime content = runtime.cmis_content_stream("readonly-browser", "cmis:asset:asset-runtime-source", context) query = runtime.cmis_query("readonly-browser", "SELECT * FROM cmis:document", context) relationships = runtime.cmis_relationships( "readonly-browser", context, object_id="cmis:asset:asset-runtime-source", ) changes = runtime.cmis_change_log("readonly-browser", context) assert content["mime_type"] in {"text/plain", "text/markdown"} assert query["total_num_items"] == 2 assert relationships["count"] == 1 assert relationships["items"][0]["properties"]["cmis:targetId"] == "cmis:asset:asset-runtime-public" assert changes["total_num_items"] >= 3 def test_runtime_cmis_browser_rejects_unsupported_query_subset(cmis_runtime) -> None: runtime, context = cmis_runtime with pytest.raises(Exception) as exc_info: runtime.cmis_query( "readonly-browser", "SELECT * FROM cmis:document JOIN cmis:relationship", context, ) assert "Unsupported CMIS query subset" in str(exc_info.value)