generated from coulomb/repo-seed
196 lines
6.9 KiB
Python
196 lines
6.9 KiB
Python
from __future__ import annotations
|
|
|
|
from kontextual_engine import (
|
|
Actor,
|
|
ActorType,
|
|
AssetRepresentation,
|
|
AssetVersion,
|
|
CMISAccessPoint,
|
|
CMISAccessProfile,
|
|
CMISAction,
|
|
CMISBaseType,
|
|
CMISDomainMapper,
|
|
Classification,
|
|
CoreRelationship,
|
|
KnowledgeAsset,
|
|
MetadataRecord,
|
|
OperationContext,
|
|
RelationshipTargetKind,
|
|
RepresentationKind,
|
|
SourceReference,
|
|
VersionChangeType,
|
|
)
|
|
|
|
|
|
def _context(actor_type: ActorType = ActorType.HUMAN) -> OperationContext:
|
|
return OperationContext.create(
|
|
Actor.create(actor_type, actor_id=f"actor-{actor_type.value}"),
|
|
correlation_id="corr-cmis-map",
|
|
)
|
|
|
|
|
|
def _mapper(profile: CMISAccessProfile | None = None) -> CMISDomainMapper:
|
|
return CMISDomainMapper(
|
|
CMISAccessPoint(
|
|
access_point_id="cmis-test",
|
|
repository_id="kontextual-test",
|
|
profile=profile or CMISAccessProfile.readonly_browser(),
|
|
base_path="/cmis/test/browser",
|
|
metadata={"repository_name": "Kontextual Test Repository"},
|
|
)
|
|
)
|
|
|
|
|
|
def _asset(sensitivity: str = "internal") -> KnowledgeAsset:
|
|
return KnowledgeAsset.create(
|
|
"Decision Record",
|
|
Classification(
|
|
asset_type="document",
|
|
sensitivity=sensitivity,
|
|
topics=("architecture", "cmis"),
|
|
owner="Platform Knowledge",
|
|
review_state="approved",
|
|
),
|
|
asset_id="asset-decision-record",
|
|
source_refs=[
|
|
SourceReference(
|
|
source_system="sharepoint",
|
|
path="Architecture/ADR 0001.md",
|
|
external_id="sp-adr-0001",
|
|
)
|
|
],
|
|
metadata={"file_name": "ADR 0001.md", "source_system": "sharepoint"},
|
|
)
|
|
|
|
|
|
def test_mapper_exposes_repository_info_capabilities_and_base_type_definitions() -> None:
|
|
mapper = _mapper()
|
|
repository = mapper.repository_info()
|
|
types = {definition["base_type_id"]: definition for definition in mapper.type_definitions()}
|
|
|
|
assert repository["repository_id"] == "kontextual-test"
|
|
assert repository["repository_name"] == "Kontextual Test Repository"
|
|
assert repository["cmis_version_supported"] == "1.1"
|
|
assert repository["binding"] == "browser"
|
|
assert repository["capabilities"]["capability_query"] == "metadataonly"
|
|
assert repository["capabilities"]["capability_multifiling"] is False
|
|
assert repository["unsupported_features"]["multifiling"]["status"] == "projection_only"
|
|
|
|
assert set(types) == {
|
|
"cmis:document",
|
|
"cmis:folder",
|
|
"cmis:relationship",
|
|
"cmis:policy",
|
|
"cmis:item",
|
|
"cmis:secondary",
|
|
}
|
|
assert types["cmis:document"]["property_definitions"]["cmis:objectId"]["required"] is True
|
|
|
|
|
|
def test_mapper_projects_asset_to_cmis_document_envelope() -> None:
|
|
mapper = _mapper()
|
|
asset = _asset()
|
|
representation = AssetRepresentation.from_content(
|
|
asset.id,
|
|
RepresentationKind.SOURCE,
|
|
"text/markdown",
|
|
"# Decision Record",
|
|
storage_ref="memory://asset-decision-record/source",
|
|
representation_id="repr-source",
|
|
)
|
|
version = AssetVersion(
|
|
asset_id=asset.id,
|
|
sequence=3,
|
|
change_type=VersionChangeType.CONTENT_CHANGED,
|
|
representation_ids=("repr-source",),
|
|
version_id="ver-current",
|
|
)
|
|
asset = asset.with_current_version(version.version_id)
|
|
projection = mapper.map_asset(
|
|
asset,
|
|
_context(),
|
|
representations=[representation],
|
|
versions=[version],
|
|
relationship_ids=["cmis:relationship:rel-derived"],
|
|
metadata_records=[MetadataRecord("status", "accepted", confirmed=True)],
|
|
)
|
|
|
|
assert projection is not None
|
|
serialized = projection.to_dict()
|
|
assert serialized["object_id"] == "cmis:asset:asset-decision-record"
|
|
assert serialized["base_type_id"] == CMISBaseType.DOCUMENT.value
|
|
assert serialized["path"] == "/sources/sharepoint/Architecture/ADR 0001.md"
|
|
assert serialized["properties"]["cmis:objectTypeId"] == "kontextual:document"
|
|
assert serialized["properties"]["kontextual:metadata:status"] == "accepted"
|
|
assert serialized["content_stream"]["mime_type"] == "text/markdown"
|
|
assert serialized["properties"]["cmis:contentStreamMimeType"] == "text/markdown"
|
|
assert serialized["properties"]["cmis:contentStreamId"] == "repr-source"
|
|
assert serialized["version"]["cmis:versionLabel"] == "3"
|
|
assert serialized["relationships"] == ["cmis:relationship:rel-derived"]
|
|
assert CMISAction.GET_CONTENT_STREAM.value in serialized["allowable_actions"]
|
|
assert CMISAction.UPDATE_PROPERTIES.value not in serialized["allowable_actions"]
|
|
|
|
|
|
def test_mapper_projects_multiple_parent_folders_without_duplicate_assets() -> None:
|
|
mapper = _mapper()
|
|
asset = _asset()
|
|
|
|
parents = mapper.parent_folders_for_asset(asset)
|
|
parent_paths = {parent["path"] for parent in parents}
|
|
|
|
assert "/sources/sharepoint/Architecture" in parent_paths
|
|
assert "/topics/architecture" in parent_paths
|
|
assert "/topics/cmis" in parent_paths
|
|
assert "/owners/Platform Knowledge" in parent_paths
|
|
assert "/lifecycle/active" in parent_paths
|
|
assert len(parent_paths) == len(parents)
|
|
|
|
|
|
def test_governed_authoring_projection_includes_write_allowable_actions() -> None:
|
|
mapper = _mapper(CMISAccessProfile.governed_authoring())
|
|
asset = _asset()
|
|
representation = AssetRepresentation.from_content(
|
|
asset.id,
|
|
RepresentationKind.NORMALIZED,
|
|
"application/json",
|
|
"{}",
|
|
)
|
|
|
|
projection = mapper.map_asset(asset, _context(), representations=[representation])
|
|
|
|
assert projection is not None
|
|
actions = {action.value for action in projection.allowable_actions}
|
|
assert {
|
|
CMISAction.UPDATE_PROPERTIES.value,
|
|
CMISAction.DELETE_OBJECT.value,
|
|
CMISAction.SET_CONTENT_STREAM.value,
|
|
} <= actions
|
|
|
|
|
|
def test_mapper_omits_assets_not_visible_through_profile() -> None:
|
|
mapper = _mapper(CMISAccessProfile.readonly_browser())
|
|
|
|
assert mapper.map_asset(_asset("confidential"), _context()) is None
|
|
|
|
|
|
def test_mapper_projects_relationship_objects() -> None:
|
|
mapper = _mapper()
|
|
relationship = CoreRelationship(
|
|
source_id="asset-source",
|
|
target_id="asset-target",
|
|
predicate="derived_from",
|
|
target_kind=RelationshipTargetKind.ASSET,
|
|
confidence=0.91,
|
|
relationship_id="rel-derived",
|
|
)
|
|
|
|
projection = mapper.map_relationship(relationship, _context())
|
|
|
|
assert projection is not None
|
|
serialized = projection.to_dict()
|
|
assert serialized["object_id"] == "cmis:relationship:rel-derived"
|
|
assert serialized["base_type_id"] == CMISBaseType.RELATIONSHIP.value
|
|
assert serialized["properties"]["cmis:sourceId"] == "cmis:asset:asset-source"
|
|
assert serialized["properties"]["cmis:targetId"] == "cmis:asset:asset-target"
|
|
assert serialized["properties"]["kontextual:predicate"] == "derived_from"
|