Files
kontextual-engine/tests/cmis/test_cmis_access_profiles.py

159 lines
5.6 KiB
Python

from __future__ import annotations
from kontextual_engine import (
Actor,
ActorType,
CMISAccessPoint,
CMISAccessProfile,
CMISAction,
CMISCapability,
Classification,
KnowledgeAsset,
OperationContext,
)
def _context(actor_type: ActorType = ActorType.HUMAN) -> OperationContext:
return OperationContext.create(
Actor.create(actor_type, actor_id=f"actor-{actor_type.value}"),
correlation_id="corr-cmis-profile",
)
def _asset(
asset_id: str,
sensitivity: str,
*,
asset_type: str = "document",
topics: tuple[str, ...] = ("cmis",),
metadata: dict | None = None,
) -> KnowledgeAsset:
return KnowledgeAsset.create(
f"Asset {asset_id}",
Classification(asset_type=asset_type, sensitivity=sensitivity, topics=topics),
asset_id=asset_id,
metadata=metadata,
)
def test_readonly_profile_exposes_read_capabilities_and_rejects_mutations() -> None:
profile = CMISAccessProfile.readonly_browser()
context = _context()
assert profile.has_capability(CMISCapability.REPOSITORY)
assert profile.has_capability(CMISCapability.CONTENT_STREAM_READ)
assert profile.decide_action(CMISAction.GET_OBJECT, context).allowed is True
mutation = profile.decide_action(CMISAction.CREATE_DOCUMENT, context)
assert mutation.allowed is False
assert mutation.reason == "cmis_mutation_not_allowed"
def test_governed_authoring_profile_allows_selected_write_actions() -> None:
profile = CMISAccessProfile.governed_authoring()
context = _context()
assert profile.decide_action(CMISAction.CREATE_DOCUMENT, context).allowed is True
assert profile.decide_action(CMISAction.SET_CONTENT_STREAM, context).allowed is True
assert profile.decide_action(CMISAction.BULK_UPDATE_PROPERTIES, context).allowed is False
assert profile.decide_action(CMISAction.APPLY_ACL, context).reason == "cmis_operation_not_implemented"
def test_compat_tck_profile_allows_bulk_update_for_browser_binding_maturity() -> None:
profile = CMISAccessProfile.compat_tck()
context = _context()
assert profile.decide_action(CMISAction.BULK_UPDATE_PROPERTIES, context).allowed is True
def test_profiles_hide_denied_sensitivities_without_partial_exposure() -> None:
profile = CMISAccessProfile.readonly_browser()
context = _context()
public = _asset("asset-public", "public")
confidential = _asset("asset-confidential", "confidential")
assert profile.exposes_asset(public, context) is True
denied = profile.decide_asset_visibility(confidential, context)
assert denied.allowed is False
assert denied.reason == "cmis_sensitivity_denied"
assert denied.context["sensitivity"] == "confidential"
def test_profile_visibility_can_be_scoped_by_type_topic_source_and_metadata() -> None:
profile = CMISAccessProfile(
name="topic-source-scope",
capabilities=(CMISCapability.OBJECT_READ,),
visible_asset_types=("document",),
visible_topics=("approved",),
visible_source_systems=("sharepoint",),
denied_metadata={"export_blocked": (True,)},
)
context = _context()
allowed = _asset(
"asset-allowed",
"internal",
topics=("approved",),
metadata={"source_system": "sharepoint"},
)
wrong_topic = _asset(
"asset-wrong-topic",
"internal",
topics=("draft",),
metadata={"source_system": "sharepoint"},
)
blocked = _asset(
"asset-blocked",
"internal",
topics=("approved",),
metadata={"source_system": "sharepoint", "export_blocked": True},
)
assert profile.exposes_asset(allowed, context) is True
assert profile.decide_asset_visibility(wrong_topic, context).reason == "cmis_topic_not_visible"
assert profile.decide_asset_visibility(blocked, context).reason == "cmis_metadata_denied"
def test_admin_export_requires_service_account_actor() -> None:
profile = CMISAccessProfile.admin_export()
human_context = _context(ActorType.HUMAN)
service_context = _context(ActorType.SERVICE_ACCOUNT)
confidential = _asset("asset-confidential", "confidential")
assert profile.decide_action(CMISAction.GET_OBJECT, human_context).allowed is False
assert profile.exposes_asset(confidential, human_context) is False
assert profile.decide_action(CMISAction.GET_OBJECT, service_context).allowed is True
assert profile.exposes_asset(confidential, service_context) is True
def test_access_point_normalizes_base_path_and_round_trips() -> None:
access_point = CMISAccessPoint(
access_point_id="cmis-readonly",
repository_id="kontextual-readonly",
profile=CMISAccessProfile.readonly_browser(),
base_path="cmis/readonly/browser",
metadata={"owner": "codex"},
)
serialized = access_point.to_dict()
restored = CMISAccessPoint.from_dict(serialized)
assert access_point.base_path == "/cmis/readonly/browser"
assert restored == access_point
assert restored.decide_action(CMISAction.GET_REPOSITORY_INFO, _context()).allowed is True
def test_disabled_access_point_denies_all_actions_and_visibility() -> None:
access_point = CMISAccessPoint(
access_point_id="cmis-disabled",
repository_id="kontextual-disabled",
profile=CMISAccessProfile.governed_authoring(),
base_path="/cmis/disabled/browser",
enabled=False,
)
context = _context()
assert access_point.decide_action(CMISAction.GET_OBJECT, context).reason == "cmis_access_point_disabled"
assert access_point.exposes_asset(_asset("asset-public", "public"), context) is False