"""Versioned FastAPI service skeleton. The service layer is intentionally thin: route handlers translate HTTP requests into service/runtime contracts and must not own domain behavior. """ from __future__ import annotations import json import re from dataclasses import dataclass, field, replace from datetime import datetime from email import policy from email.parser import BytesParser from importlib import metadata from typing import Any from urllib.parse import parse_qs from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository, InMemoryBlobStorage from kontextual_engine.core import ( Actor, ActorType, AssetRepresentation, AssetVersion, AuditEvent, AuditOutcome, Classification, CMISAccessPoint, CMISAccessProfile, CMISAction, CMISBaseType, CMISDomainMapper, ContextEntity, ContextEntityType, IngestionIdentityPolicy, IngestionJobStatus, LifecycleState, MetadataRecord, OperationContext, PolicyDecision, PolicyEffect, RelationshipTargetKind, RepresentationKind, RetrievalFeedbackLabel, SourceReference, TransformationRunStatus, VersionChangeType, WorkflowExceptionKind, WorkflowExceptionStatus, WorkflowInputDefinition, WorkflowReviewDecisionType, WorkflowReviewStatus, WorkflowRunStatus, WorkflowStepDefinition, WorkflowTemplate, content_digest, new_id, stable_json_dumps, utc_now, ) from kontextual_engine.core.cmis import ( cmis_browser_object, cmis_browser_object_in_folder_list, cmis_browser_parent_list, cmis_browser_query_result, cmis_browser_root_folder, cmis_browser_service_document, cmis_browser_type_children, cmis_browser_type_descendants, cmis_browser_type_definition_by_id, ) from kontextual_engine.errors import AuthorizationError, KontextualError, NotFoundError, ValidationError from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, BlobRef, BlobStorage, PolicyGateway from kontextual_engine.services import ( AssetIngestionService, AssetQueryRequest, AssetRegistryService, AssetRetrievalService, ContextEntityQueryRequest, RelationshipQueryRequest, RepresentationContentService, RepresentationContentStream, RetrievalFeedbackRequest, TransformationRequest, TransformationService, WorkflowInvocation, WorkflowService, ) API_VERSION = "v1" OPENAPI_VERSION = "1.0.0" CMIS_APPEND_MAX_COMPOSED_BYTES = 64 * 1024 * 1024 CMIS_QUERY_SUPPORTED = [ "SELECT * FROM cmis:document", "SELECT * FROM kontextual:document", "SELECT * FROM cmis:document WHERE = '' [AND ...]", "SELECT * FROM cmis:document WHERE LIKE '' [AND ...]", "SELECT * FROM cmis:document WHERE IN ('', ...) [AND ...]", "SELECT * FROM cmis:document ... ORDER BY [ASC|DESC]", ] CMIS_QUERY_FILTERABLE_FIELDS = { "cmis:objectId", "cmis:name", "cmis:objectTypeId", "cmis:baseTypeId", "cmis:description", "kontextual:assetId", "kontextual:assetType", "kontextual:sensitivity", "kontextual:lifecycle", "kontextual:owner", "kontextual:topics", "kontextual:reviewState", } CMIS_QUERY_ORDERABLE_FIELDS = { "cmis:objectId", "cmis:name", "cmis:creationDate", "cmis:lastModificationDate", } CMIS_QUERY_LIKE_FIELDS = { "cmis:name", "cmis:description", "kontextual:assetId", "kontextual:assetType", "kontextual:owner", "kontextual:topics", "kontextual:reviewState", } AGENT_OPERATION_CATALOG: tuple[dict[str, Any], ...] = ( { "operation_id": "inspect_asset", "description": "Read one asset envelope by ID.", "input_schema": {"required": ["asset_id"]}, "output_schema": {"type": "asset"}, "required_permissions": ["agent.operation.inspect_asset", "asset.retrieve"], "audit_operation": "agent.operation.inspect_asset", "failure_modes": ["not_found", "permission_denied"], "dry_run_supported": True, }, { "operation_id": "retrieve_asset", "description": "Read one source-grounded asset bundle with metadata, representations, and relationships.", "input_schema": {"required": ["asset_id"]}, "output_schema": {"type": "asset_bundle"}, "required_permissions": ["agent.operation.retrieve_asset", "asset.retrieve"], "audit_operation": "agent.operation.retrieve_asset", "failure_modes": ["not_found", "permission_denied"], "dry_run_supported": True, }, { "operation_id": "search_assets", "description": "Run a governed retrieval query over assets.", "input_schema": {"required": ["query"]}, "output_schema": {"type": "asset_query_result"}, "required_permissions": ["agent.operation.search_assets", "retrieval.assets.query"], "audit_operation": "agent.operation.search_assets", "failure_modes": ["permission_denied", "validation_error", "zero_results"], "dry_run_supported": True, }, { "operation_id": "assemble_context", "description": "Assemble a non-durable source-grounded context preview from a bounded asset query.", "input_schema": {"required": ["query"], "optional": ["intent", "instructions", "constraints"]}, "output_schema": {"type": "context_preview"}, "required_permissions": ["agent.operation.assemble_context", "retrieval.assets.query"], "audit_operation": "agent.operation.assemble_context", "failure_modes": ["permission_denied", "validation_error", "zero_results"], "dry_run_supported": True, }, { "operation_id": "enrich_metadata", "description": "Add one metadata record to an asset.", "input_schema": {"required": ["asset_id", "metadata"]}, "output_schema": {"type": "asset_change"}, "required_permissions": ["agent.operation.enrich_metadata", "asset.metadata.add"], "audit_operation": "agent.operation.enrich_metadata", "failure_modes": ["not_found", "permission_denied", "validation_error", "version_conflict"], "dry_run_supported": True, }, { "operation_id": "classify_asset", "description": "Request the classify transformation operation for an asset.", "input_schema": {"required": ["asset_id"]}, "output_schema": {"type": "transformation_run_result"}, "required_permissions": ["agent.operation.classify_asset", "transformation.run.execute"], "audit_operation": "agent.operation.classify_asset", "failure_modes": ["permission_denied", "adapter_unavailable", "operation_failed"], "dry_run_supported": True, }, { "operation_id": "transform_asset", "description": "Execute a registered transformation operation.", "input_schema": {"required": ["transformation"]}, "output_schema": {"type": "transformation_run_result"}, "required_permissions": ["agent.operation.transform_asset", "transformation.run.execute"], "audit_operation": "agent.operation.transform_asset", "failure_modes": ["permission_denied", "adapter_unavailable", "operation_failed"], "dry_run_supported": True, }, { "operation_id": "invoke_workflow", "description": "Invoke a registered workflow template.", "input_schema": {"required": ["workflow"]}, "output_schema": {"type": "workflow_run_result"}, "required_permissions": ["agent.operation.invoke_workflow", "workflow.run.execute"], "audit_operation": "agent.operation.invoke_workflow", "failure_modes": ["not_found", "permission_denied", "operation_failed", "review_required"], "dry_run_supported": True, }, { "operation_id": "submit_review", "description": "Submit a decision for one open workflow review task.", "input_schema": {"required": ["run_id", "review_id", "decision"]}, "output_schema": {"type": "workflow_run_result"}, "required_permissions": ["agent.operation.submit_review", "workflow.review.decide"], "audit_operation": "agent.operation.submit_review", "failure_modes": ["not_found", "permission_denied", "review_not_open", "operation_failed"], "dry_run_supported": True, }, { "operation_id": "report_result", "description": "Record an agent result report without mutating domain assets.", "input_schema": {"required": ["summary"], "optional": ["result_ref", "metadata"]}, "output_schema": {"type": "agent_report"}, "required_permissions": ["agent.operation.report_result"], "audit_operation": "agent.operation.report_result", "failure_modes": ["permission_denied", "validation_error"], "dry_run_supported": True, }, ) @dataclass class CMISWorkspaceFolder: access_point_id: str object_id: str path: str name: str parent_id: str created_by: str created_at: str updated_at: str lifecycle: str = LifecycleState.ACTIVE.value @dataclass class ServiceRuntime: repository: AssetRegistryRepository = field(default_factory=InMemoryAssetRegistryRepository) blob_storage: BlobStorage = field(default_factory=InMemoryBlobStorage) policy_gateway: PolicyGateway = field(default_factory=AllowAllPolicyGateway) cmis_workspace_folders: dict[str, dict[str, CMISWorkspaceFolder]] = field(default_factory=dict) api_version: str = API_VERSION service_name: str = "kontextual-engine" started_at: str = field(default_factory=lambda: utc_now().isoformat()) def asset_service(self) -> AssetRegistryService: return AssetRegistryService(self.repository, policy_gateway=self.policy_gateway) def ingestion_service(self) -> AssetIngestionService: return AssetIngestionService(self.repository, asset_service=self.asset_service()) def retrieval_service(self) -> AssetRetrievalService: return AssetRetrievalService(self.repository, policy_gateway=self.policy_gateway) def content_service(self) -> RepresentationContentService: return RepresentationContentService( self.repository, self.blob_storage, policy_gateway=self.policy_gateway, asset_service=self.asset_service(), ) def transformation_service(self) -> TransformationService: return TransformationService( self.repository, policy_gateway=self.policy_gateway, asset_service=self.asset_service(), ) def workflow_service(self) -> WorkflowService: return WorkflowService( self.repository, transformation_service=self.transformation_service(), policy_gateway=self.policy_gateway, ) def operation_context( self, *, actor_id: str = "api-user", actor_type: str = "human", display_name: str | None = None, external_ref: str | None = None, correlation_id: str | None = None, groups: list[str] | None = None, delegated_actor_id: str | None = None, delegated_actor_type: str = "human", delegated_actor_display_name: str | None = None, delegated_actor_external_ref: str | None = None, delegated_actor_groups: list[str] | None = None, request_scope: dict[str, Any] | None = None, policy_scope: dict[str, Any] | None = None, agent_id: str | None = None, agent_name: str | None = None, agent_run_id: str | None = None, agent_tool: str | None = None, metadata: dict[str, Any] | None = None, ) -> OperationContext: actor_metadata = dict(metadata or {}) agent_metadata = _agent_metadata( agent_id=agent_id, agent_name=agent_name, agent_run_id=agent_run_id, agent_tool=agent_tool, ) if agent_metadata: actor_metadata["agent"] = agent_metadata actor = Actor.create( ActorType(actor_type), actor_id=actor_id, display_name=display_name, external_ref=external_ref, groups=groups, metadata=actor_metadata, ) delegated_actor = None if delegated_actor_id: delegated_actor = Actor.create( ActorType(delegated_actor_type), actor_id=delegated_actor_id, display_name=delegated_actor_display_name, external_ref=delegated_actor_external_ref, groups=delegated_actor_groups, ) context_metadata: dict[str, Any] = {} if agent_metadata: context_metadata["agent"] = agent_metadata if delegated_actor is not None: context_metadata["delegation"] = { "mode": "on_behalf_of", "actor_id": actor.id, "delegated_actor_id": delegated_actor.id, } return OperationContext.create( actor, correlation_id=correlation_id, delegated_actor=delegated_actor, request_scope=request_scope, policy_scope=policy_scope, metadata=context_metadata, ) @property def package_version(self) -> str: try: return metadata.version("kontextual-engine") except metadata.PackageNotFoundError: return "0.1.0" def health(self) -> dict[str, Any]: return { "status": "ok", "service": self.service_name, "api_version": self.api_version, "package_version": self.package_version, "started_at": self.started_at, } def readiness(self) -> dict[str, Any]: checks: dict[str, dict[str, Any]] = {} try: asset_count = len(self.repository.list_assets()) checks["asset_registry"] = { "status": "ok", "repository": type(self.repository).__name__, "asset_count": asset_count, } except Exception as exc: checks["asset_registry"] = { "status": "error", "repository": type(self.repository).__name__, "error_type": type(exc).__name__, "message": str(exc), } ready = all(item["status"] == "ok" for item in checks.values()) return { "status": "ready" if ready else "not_ready", "ready": ready, "service": self.service_name, "api_version": self.api_version, "checks": checks, } def version(self) -> dict[str, Any]: return { "service": self.service_name, "api_version": self.api_version, "package_version": self.package_version, "openapi_version": OPENAPI_VERSION, } def cmis_access_points(self) -> dict[str, Any]: access_points = [_cmis_access_point(profile) for profile in _cmis_profiles()] return {"items": [access_point.to_dict() for access_point in access_points], "count": len(access_points)} def cmis_repository_info(self, access_point_id: str) -> dict[str, Any]: return self._cmis_mapper(access_point_id).repository_info() def cmis_browser_service_document( self, access_point_id: str, *, repository_url: str, root_folder_url: str, ) -> dict[str, Any]: return cmis_browser_service_document( self.cmis_repository_info(access_point_id), repository_url=repository_url, root_folder_url=root_folder_url, ) def cmis_type_definitions(self, access_point_id: str) -> dict[str, Any]: definitions = self._cmis_mapper(access_point_id).type_definitions() return {"items": definitions, "count": len(definitions)} def cmis_browser_type_children( self, access_point_id: str, *, type_id: str | None = None, skip_count: int = 0, max_items: int = 100, include_property_definitions: bool = False, ) -> dict[str, Any]: return cmis_browser_type_children( self._cmis_mapper(access_point_id).type_definitions(), type_id=type_id, skip_count=skip_count, max_items=max_items, include_property_definitions=include_property_definitions, ) def cmis_browser_type_descendants( self, access_point_id: str, *, type_id: str | None = None, include_property_definitions: bool = False, ) -> list[dict[str, Any]]: return cmis_browser_type_descendants( self._cmis_mapper(access_point_id).type_definitions(), type_id=type_id, include_property_definitions=include_property_definitions, ) def cmis_browser_type_definition( self, access_point_id: str, *, type_id: str | None, ) -> dict[str, Any]: try: return cmis_browser_type_definition_by_id( self._cmis_mapper(access_point_id).type_definitions(), type_id, ) except KeyError as exc: raise NotFoundError( "CMIS type definition not found", details={"access_point_id": access_point_id, "type_id": type_id}, ) from exc def cmis_browser_root_object( self, access_point_id: str, *, property_filter: str | None = None, include_allowable_actions: bool = True, include_acl: bool = True, ) -> dict[str, Any]: return cmis_browser_object( cmis_browser_root_folder(self._cmis_access_point(access_point_id)), property_filter=property_filter, include_allowable_actions=include_allowable_actions, include_acl=include_acl, ) def cmis_browser_object( self, access_point_id: str, object_id: str | None, context: OperationContext, *, property_filter: str | None = None, include_allowable_actions: bool = True, include_acl: bool = True, ) -> dict[str, Any]: if object_id in (None, "", "cmis-root", "root", "/"): return self.cmis_browser_root_object( access_point_id, property_filter=property_filter, include_allowable_actions=include_allowable_actions, include_acl=include_acl, ) if object_id.startswith("cmis:folder:"): folder_path = _cmis_folder_path(object_id) or "/" mapper = self._cmis_mapper(access_point_id) workspace_folder = self._cmis_workspace_folder_by_object_id(access_point_id, object_id) if workspace_folder is not None: return cmis_browser_object( self._cmis_workspace_folder_projection(mapper, workspace_folder), property_filter=property_filter, include_allowable_actions=include_allowable_actions, include_acl=include_acl, ) if not self._cmis_folder_exists(mapper, context, folder_path): raise NotFoundError( "CMIS folder not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) return cmis_browser_object( self._cmis_folder_projection(access_point_id, folder_path), property_filter=property_filter, include_allowable_actions=include_allowable_actions, include_acl=include_acl, ) return cmis_browser_object( self.cmis_object(access_point_id, object_id, context), property_filter=property_filter, include_allowable_actions=include_allowable_actions, include_acl=include_acl, ) def cmis_browser_object_by_path( self, access_point_id: str, path: str, context: OperationContext, *, property_filter: str | None = None, include_allowable_actions: bool = True, include_acl: bool = True, ) -> dict[str, Any]: return cmis_browser_object( self.cmis_object_by_path(access_point_id, path, context), property_filter=property_filter, include_allowable_actions=include_allowable_actions, include_acl=include_acl, ) def cmis_browser_children( self, access_point_id: str, context: OperationContext, *, object_id: str | None = None, skip_count: int = 0, max_items: int = 100, property_filter: str | None = None, include_allowable_actions: bool = True, include_acl: bool = True, include_path_segment: bool = True, ) -> dict[str, Any]: children = self.cmis_children( access_point_id, context, folder_id=object_id, skip_count=skip_count, max_items=max_items, ) return cmis_browser_object_in_folder_list( children, property_filter=property_filter, include_allowable_actions=include_allowable_actions, include_acl=include_acl, include_path_segment=include_path_segment, ) def cmis_browser_parents( self, access_point_id: str, object_id: str, context: OperationContext, *, include_relative_path_segment: bool = True, ) -> list[dict[str, Any]]: return cmis_browser_parent_list( self.cmis_object_parents(access_point_id, object_id, context), include_relative_path_segment=include_relative_path_segment, ) def cmis_browser_parent( self, access_point_id: str, object_id: str, context: OperationContext, ) -> dict[str, Any]: parents = self.cmis_object_parents(access_point_id, object_id, context).get("parents", []) if not parents: raise NotFoundError( "CMIS folder parent not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) return cmis_browser_object(parents[0]) def cmis_browser_query( self, access_point_id: str, query: str, context: OperationContext, *, skip_count: int = 0, max_items: int = 100, ) -> dict[str, Any]: return cmis_browser_query_result( self.cmis_query( access_point_id, query, context, skip_count=skip_count, max_items=max_items, ) ) def cmis_children( self, access_point_id: str, context: OperationContext, *, folder_id: str | None = None, skip_count: int = 0, max_items: int = 100, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.GET_CHILDREN, context) if not decision.allowed: raise _cmis_authorization_error(decision, "getChildren") folder_path = _cmis_folder_path(folder_id) projections = self._cmis_children_for_folder(mapper, context, folder_path=folder_path) paged = projections[max(skip_count, 0) : max(skip_count, 0) + max(max_items, 0)] return { "folder_id": folder_id or mapper.access_point.root_folder_id, "folder_path": folder_path or "/", "objects": paged, "num_items": len(paged), "has_more_items": len(projections) > max(skip_count, 0) + len(paged), "total_num_items": len(projections), } def cmis_object( self, access_point_id: str, object_id: str, context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.GET_OBJECT, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "getObject") if object_id.startswith("cmis:folder:"): folder_path = _cmis_folder_path(object_id) or "/" workspace_folder = self._cmis_workspace_folder_by_object_id(access_point_id, object_id) if workspace_folder is not None: return self._cmis_workspace_folder_projection(mapper, workspace_folder) if not self._cmis_folder_exists(mapper, context, folder_path): raise NotFoundError( "CMIS folder not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) return self._cmis_folder_projection(access_point_id, folder_path) asset_id = _cmis_asset_id(object_id) asset = self.repository.get_asset(asset_id) projection = mapper.map_asset( asset, context, representations=self._cmis_asset_representations(asset), versions=self.repository.list_versions(asset.id), relationship_ids=[ f"cmis:relationship:{relationship.relationship_id}" for relationship in self.repository.list_relationships(source_id=asset.id) ], metadata_records=self.repository.list_metadata_records(asset.id), ) if projection is None: raise NotFoundError( "CMIS object not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) return projection.to_dict() def cmis_object_by_path( self, access_point_id: str, path: str, context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) normalized = _normalize_cmis_path(path) if normalized == "/": return cmis_browser_root_folder(mapper.access_point) workspace_folder = self._cmis_workspace_folder_map(access_point_id).get(normalized) if workspace_folder is not None: return self._cmis_workspace_folder_projection(mapper, workspace_folder) for asset in self.repository.list_assets(): if not mapper.access_point.exposes_asset(asset, context): continue if normalized not in mapper.asset_paths(asset): continue projection = mapper.map_asset( asset, context, representations=self._cmis_asset_representations(asset), versions=self.repository.list_versions(asset.id), relationship_ids=[ f"cmis:relationship:{relationship.relationship_id}" for relationship in self.repository.list_relationships(source_id=asset.id) if self._cmis_relationship_visible(mapper, relationship, context) ], metadata_records=self.repository.list_metadata_records(asset.id), ) if projection is not None: return projection.to_dict() if any( folder.path != normalized and _path_contains(normalized, folder.path) for folder in self._cmis_workspace_folder_map(access_point_id).values() ): return mapper.folder_projection(normalized) for asset in self.repository.list_assets(): if not mapper.access_point.exposes_asset(asset, context): continue if any(asset_path != normalized and _path_contains(normalized, asset_path) for asset_path in mapper.asset_paths(asset)): return mapper.folder_projection(normalized) raise NotFoundError( "CMIS object path not found", details={"path": normalized, "access_point_id": access_point_id}, ) def cmis_content_stream( self, access_point_id: str, object_id: str, context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.GET_CONTENT_STREAM, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "getContentStream") object_projection = self.cmis_object(access_point_id, object_id, context) content_stream = object_projection.get("content_stream") if not content_stream: raise NotFoundError( "CMIS content stream not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) return content_stream def cmis_content_stream_bytes( self, access_point_id: str, object_id: str, context: OperationContext, ): mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.GET_CONTENT_STREAM, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "getContentStream") asset_id = _cmis_asset_id(object_id) asset = self.repository.get_asset(asset_id) if not mapper.access_point.exposes_asset(asset, context): raise NotFoundError( "CMIS object not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) if asset.metadata.get("cmis_content_deleted"): raise ValidationError( "CMIS document has no content stream", details={ "code": "cmis.no_content_stream", "cmis_exception": "constraint", "object_id": object_id, "access_point_id": access_point_id, }, ) if not self._cmis_asset_representations(asset): digest = content_digest(b"") representation = AssetRepresentation( asset_id=asset_id, kind=RepresentationKind.SOURCE, media_type="", digest=digest, size_bytes=0, storage_ref="", producer="cmis-empty-content-stream", ) decision = PolicyDecision.allow( context.actor.id, "asset.content_stream.read", f"asset:{asset.id}", reason="CMIS document has no content stream; returning an empty compatibility stream.", ) return RepresentationContentStream( representation, (), BlobRef( digest=digest, size_bytes=0, storage_key="", storage_ref="", adapter="cmis-empty", media_type=representation.media_type, ), decision, AuditEvent.from_context( "asset.content_stream.read", f"asset:{asset.id}", AuditOutcome.SUCCESS, context, policy_decision=decision, details={"cmis_empty_content_stream": True}, ), ) return self.content_service().stream_content(asset_id, context) def cmis_acl( self, access_point_id: str, object_id: str, context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.GET_ACL, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "getACL") asset_id = _cmis_asset_id(object_id) asset = self.repository.get_asset(asset_id) acl = mapper.acl_for_asset(asset, context) if acl is None: raise NotFoundError( "CMIS object not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) return acl def cmis_object_parents( self, access_point_id: str, object_id: str, context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.GET_OBJECT_PARENTS, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "getObjectParents") if object_id.startswith("cmis:folder:"): workspace_folder = self._cmis_workspace_folder_by_object_id(access_point_id, object_id) folder_path = workspace_folder.path if workspace_folder is not None else (_cmis_folder_path(object_id) or "/") if not self._cmis_folder_exists(mapper, context, folder_path): raise NotFoundError( "CMIS folder not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) if folder_path == "/": return {"object_id": object_id, "parents": [], "count": 0} parent_path = _path_parent(folder_path) parent = self._cmis_folder_projection(access_point_id, parent_path) parent["relative_path_segment"] = _path_name(folder_path) return {"object_id": object_id, "parents": [parent], "count": 1} asset_id = _cmis_asset_id(object_id) asset = self.repository.get_asset(asset_id) if not mapper.access_point.exposes_asset(asset, context): raise NotFoundError( "CMIS object not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) explicit_cmis_path = asset.metadata.get("cmis_path") parent_paths = ( [_path_parent(str(explicit_cmis_path))] if explicit_cmis_path else [parent["path"] for parent in mapper.parent_folders_for_asset(asset)] ) child_segment = _path_name(str(explicit_cmis_path)) if explicit_cmis_path else str( asset.metadata.get("file_name") or asset.title ) parents = [] for path in dict.fromkeys(parent_paths): parent = self._cmis_folder_projection(access_point_id, path) parent["relative_path_segment"] = child_segment parents.append(parent) return {"object_id": mapper.asset_object_id(asset.id), "parents": parents, "count": len(parents)} def cmis_create_folder( self, access_point_id: str, payload: dict[str, Any], context: OperationContext, *, parent_folder_id: str | None = None, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.CREATE_FOLDER, context) if not decision.allowed: raise _cmis_authorization_error(decision, "createFolder") properties = dict(payload.get("properties", {})) name = str(payload.get("name") or properties.get("cmis:name") or "").strip() if not name: raise ValidationError("CMIS folder name is required", details={"operation": "createFolder"}) type_id = properties.get("cmis:objectTypeId", payload.get("type_id", CMISBaseType.FOLDER.value)) if type_id not in {CMISBaseType.FOLDER.value, "kontextual:folder"}: raise ValidationError( "Unsupported CMIS folder type", details={"operation": "createFolder", "type_id": type_id, "supported": [CMISBaseType.FOLDER.value]}, ) parent_id = parent_folder_id or payload.get("folder_id") or payload.get("folderId") or "cmis-root" parent_path = _cmis_folder_path(parent_id) or "/" folder_path = _normalize_cmis_path(f"{parent_path}/{name}") folders = self._cmis_workspace_folder_map(access_point_id) if folder_path in folders: raise ValidationError( "CMIS folder already exists", details={"operation": "createFolder", "path": folder_path}, ) parent_object_id = "cmis-root" if parent_path == "/" else mapper.folder_object_id(parent_path) now = utc_now().isoformat() folder = CMISWorkspaceFolder( access_point_id=access_point_id, object_id=mapper.folder_object_id(folder_path), path=folder_path, name=_path_name(folder_path), parent_id=parent_object_id, created_by=context.actor.id, created_at=now, updated_at=now, ) folders[folder.path] = folder return self._cmis_workspace_folder_projection(mapper, folder) def cmis_browser_create_folder( self, access_point_id: str, payload: dict[str, Any], context: OperationContext, *, parent_folder_id: str | None = None, ) -> dict[str, Any]: return cmis_browser_object( self.cmis_create_folder(access_point_id, payload, context, parent_folder_id=parent_folder_id) ) def cmis_create_document( self, access_point_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.CREATE_DOCUMENT, context) if not decision.allowed: raise _cmis_authorization_error(decision, "createDocument") properties = dict(payload.get("properties", {})) name = payload.get("name") or properties.get("cmis:name") if not name: raise ValidationError("CMIS document name is required", details={"operation": "createDocument"}) type_id = properties.get("cmis:objectTypeId", payload.get("type_id", CMISBaseType.DOCUMENT.value)) asset_type = payload.get("asset_type", "document") if type_id not in {CMISBaseType.DOCUMENT.value, f"kontextual:{asset_type}"}: raise ValidationError( "Unsupported CMIS document type", details={"operation": "createDocument", "type_id": type_id, "supported": [CMISBaseType.DOCUMENT.value]}, ) classification = Classification.from_dict( { "asset_type": asset_type, "sensitivity": payload.get("sensitivity", "internal"), "owner": payload.get("owner"), "topics": payload.get("topics", []), "metadata": dict(payload.get("classification_metadata", {})), } ) asset_id = payload.get("asset_id") or new_id("asset") content = payload.get("content") representations = [] if content is not None: representation, _blob, _created = self.content_service().build_representation_from_bytes( asset_id, RepresentationKind.SOURCE, _cmis_media_type(payload.get("media_type", "text/plain")), content, metadata={"cmis": {"operation": "createDocument"}}, ) representations.append(representation) result = self.asset_service().create_asset( str(name), classification, context, asset_id=asset_id, representations=representations, metadata_records=[_metadata_record(item) for item in payload.get("metadata_records", [])], idempotency_key=payload.get("idempotency_key"), ) metadata = dict(result.asset.metadata) if "cmis:description" in properties: metadata["description"] = str(properties.get("cmis:description") or "") if "cmis:secondaryObjectTypeIds" in properties: metadata["cmis_secondary_object_type_ids"] = _cmis_value_list(properties.get("cmis:secondaryObjectTypeIds")) folder_id = payload.get("folder_id") or payload.get("folderId") if folder_id: folder_path = _cmis_folder_path(folder_id) or "/" asset_path = _normalize_cmis_path(f"{folder_path}/{name}") metadata = { **metadata, "cmis_path": asset_path, "cmis_parent_folder_id": "cmis-root" if folder_path == "/" else mapper.folder_object_id(folder_path), "file_name": str(name), } if metadata != result.asset.metadata: self.repository.save_asset(replace(result.asset, metadata=metadata)) return self.cmis_object(access_point_id, mapper.asset_object_id(result.asset.id), context) def cmis_browser_create_document( self, access_point_id: str, payload: dict[str, Any], context: OperationContext, *, parent_folder_id: str | None = None, ) -> dict[str, Any]: payload = dict(payload) if parent_folder_id and "folder_id" not in payload and "folderId" not in payload: payload["folder_id"] = parent_folder_id return cmis_browser_object(self.cmis_create_document(access_point_id, payload, context)) def cmis_create_document_from_source( self, access_point_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.CREATE_DOCUMENT, context) if not decision.allowed: raise _cmis_authorization_error(decision, "createDocumentFromSource") properties = _cmis_browser_properties(payload) type_id = properties.get("cmis:objectTypeId") if type_id and type_id not in {CMISBaseType.DOCUMENT.value, "kontextual:document"}: raise ValidationError( "Invalid CMIS source-copy document type", details={"operation": "createDocumentFromSource", "type_id": type_id}, ) source_object_id = payload.get("sourceId") or payload.get("source_id") or payload.get("sourceObjectId") if not source_object_id: raise ValidationError( "CMIS source object id is required", details={"operation": "createDocumentFromSource", "parameter": "sourceId"}, ) source_asset_id = _cmis_asset_id(str(source_object_id)) source_asset = self.repository.get_asset(source_asset_id) if not mapper.access_point.exposes_asset(source_asset, context): raise NotFoundError( "CMIS source object not found", details={"object_id": source_object_id, "access_point_id": access_point_id}, ) name = str(properties.get("cmis:name") or payload.get("name") or source_asset.title).strip() if not name: raise ValidationError("CMIS name cannot be empty", details={"operation": "createDocumentFromSource"}) asset_id = payload.get("asset_id") or new_id("asset") representations = [ replace( representation, asset_id=asset_id, representation_id=new_id("repr"), producer="cmis-createDocumentFromSource", metadata={ **representation.metadata, "cmis_source_object_id": str(source_object_id), "cmis_source_representation_id": representation.representation_id, }, ) for representation in self._cmis_asset_representations(source_asset) ] result = self.asset_service().create_asset( name, source_asset.classification, context, asset_id=asset_id, source_refs=list(source_asset.source_refs), representations=representations, idempotency_key=payload.get("idempotency_key"), ) metadata = { key: value for key, value in source_asset.metadata.items() if key not in {"cmis_path", "cmis_paths", "cmis_parent_folder_id", "cmis_content_deleted"} } metadata.update( { "cmis_copied_from_object_id": str(source_object_id), "cmis_copied_from_asset_id": source_asset.id, "file_name": name, } ) if "cmis:description" in properties: metadata["description"] = str(properties.get("cmis:description") or "") if "cmis:secondaryObjectTypeIds" in properties: metadata["cmis_secondary_object_type_ids"] = _cmis_value_list(properties.get("cmis:secondaryObjectTypeIds")) folder_id = payload.get("folder_id") or payload.get("folderId") if folder_id: folder_path = _cmis_folder_path(folder_id) or "/" metadata["cmis_path"] = _normalize_cmis_path(f"{folder_path}/{name}") metadata["cmis_parent_folder_id"] = "cmis-root" if folder_path == "/" else mapper.folder_object_id(folder_path) self.repository.save_asset(replace(result.asset, metadata=metadata)) return self.cmis_object(access_point_id, mapper.asset_object_id(result.asset.id), context) def cmis_browser_create_document_from_source( self, access_point_id: str, payload: dict[str, Any], context: OperationContext, *, parent_folder_id: str | None = None, ) -> dict[str, Any]: payload = dict(payload) if parent_folder_id and "folder_id" not in payload and "folderId" not in payload: payload["folder_id"] = parent_folder_id return cmis_browser_object(self.cmis_create_document_from_source(access_point_id, payload, context)) def cmis_bulk_update_properties( self, access_point_id: str, payload: dict[str, Any], context: OperationContext, ) -> list[dict[str, Any]]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.BULK_UPDATE_PROPERTIES, context) if not decision.allowed: raise _cmis_authorization_error(decision, "bulkUpdateProperties") entries = _cmis_browser_bulk_entries(payload) if not entries: raise ValidationError( "CMIS bulk update object ids are required", details={"operation": "bulkUpdateProperties", "parameter": "objectId"}, ) properties = _cmis_browser_properties(payload) if not properties: raise ValidationError( "CMIS bulk update properties are required", details={"operation": "bulkUpdateProperties", "parameter": "propertyId"}, ) updated_entries: list[dict[str, Any]] = [] for entry in entries: update_payload: dict[str, Any] = {"properties": dict(properties)} if entry.get("change_token"): update_payload["expected_current_version_id"] = entry["change_token"] updated = self.cmis_update_properties(access_point_id, entry["object_id"], update_payload, context) updated_properties = updated.get("properties", {}) object_id_property = updated_properties.get("cmis:objectId") change_token_property = updated_properties.get("cmis:changeToken") updated_entries.append( { "id": entry["object_id"], "newId": object_id_property, "changeToken": change_token_property, } ) return updated_entries def _cmis_expected_change_token(self, payload: dict[str, Any], properties: dict[str, Any]) -> str | None: return properties.pop( "expected_current_version_id", payload.get("expected_current_version_id") or payload.get("changeToken") or payload.get("change_token"), ) def _cmis_assert_change_token( self, asset, expected_current_version_id: str | None, *, operation: str, ) -> None: if not expected_current_version_id: return if asset.current_version_id != expected_current_version_id: raise ValidationError( "CMIS change token conflict", details={ "code": "asset.version_conflict", "cmis_exception": "updateConflict", "operation": operation, "asset_id": asset.id, "expected_current_version_id": expected_current_version_id, "current_version_id": asset.current_version_id, }, ) def _cmis_record_asset_version( self, asset, context: OperationContext, *, change_type: VersionChangeType, operation_id: str, metadata_delta: dict[str, Any] | None = None, representation_ids: tuple[str, ...] = (), ): versions = self.repository.list_versions(asset.id) next_sequence = max((version.sequence for version in versions), default=0) + 1 version = AssetVersion( asset_id=asset.id, sequence=next_sequence, change_type=change_type, representation_ids=representation_ids, actor_id=context.actor.id, operation_id=operation_id, parent_version_id=asset.current_version_id, metadata_delta=dict(metadata_delta or {}), lifecycle=asset.lifecycle.value, ) updated = asset.with_current_version(version.version_id) self.repository.save_actor(context.actor) self.repository.save_asset(updated) self.repository.save_version(version) self.repository.save_audit_event( AuditEvent.from_context( operation_id, f"asset:{asset.id}", AuditOutcome.SUCCESS, context, details={"version_id": version.version_id}, ) ) return updated def cmis_update_properties( self, access_point_id: str, object_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.UPDATE_PROPERTIES, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "updateProperties") properties = dict(payload.get("properties", payload)) expected = self._cmis_expected_change_token(payload, properties) if object_id.startswith("cmis:folder:"): return self._cmis_update_workspace_folder(mapper, object_id, properties, context) asset_id = _cmis_asset_id(object_id) asset_metadata_updates: dict[str, Any] = {} title_update: str | None = None for key, value in properties.items(): if key.startswith("cmis:"): if key == "cmis:name": title_update = str(value).strip() if not title_update: raise ValidationError("CMIS name cannot be empty", details={"operation": "updateProperties"}) asset_metadata_updates["file_name"] = title_update continue if key == "cmis:secondaryObjectTypeIds": asset_metadata_updates["cmis_secondary_object_type_ids"] = _cmis_value_list(value) continue if key == "cmis:description": asset_metadata_updates["description"] = str(value) if value is not None else "" continue raise ValidationError( "Unsupported CMIS property update", details={ "property": key, "operation": "updateProperties", "supported": [ "cmis:name", "cmis:description", "cmis:secondaryObjectTypeIds", "kontextual:metadata:", ], }, ) self.asset_service().add_metadata_record( asset_id, MetadataRecord(key=_cmis_metadata_key(key), value=value, confirmed=bool(payload.get("confirmed", True))), context, expected_current_version_id=expected, ) expected = None if asset_metadata_updates or title_update is not None: asset = self.repository.get_asset(asset_id) self._cmis_assert_change_token(asset, expected, operation="updateProperties") metadata_delta = dict(asset_metadata_updates) if title_update is not None: metadata_delta["title"] = title_update asset = self._cmis_record_asset_version( asset, context, change_type=VersionChangeType.METADATA_CHANGED, operation_id="cmis.updateProperties", metadata_delta=metadata_delta, ) metadata = {**asset.metadata, **asset_metadata_updates} if title_update is not None and asset.metadata.get("cmis_path"): current_path = _normalize_cmis_path(str(asset.metadata["cmis_path"])) parent_path = _path_parent(current_path) metadata["cmis_path"] = _normalize_cmis_path(f"{parent_path}/{title_update}") metadata["cmis_parent_folder_id"] = "cmis-root" if parent_path == "/" else mapper.folder_object_id(parent_path) self.repository.save_asset( replace(asset, title=title_update or asset.title, metadata=metadata) ) return self.cmis_object(access_point_id, object_id, context) def cmis_set_content_stream( self, access_point_id: str, object_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.SET_CONTENT_STREAM, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "setContentStream") asset_id = _cmis_asset_id(object_id) expected = ( payload.get("expected_current_version_id") or payload.get("changeToken") or payload.get("change_token") ) asset = self.repository.get_asset(asset_id) if asset.metadata.get("cmis_content_deleted"): metadata = dict(asset.metadata) metadata.pop("cmis_content_deleted", None) self.repository.save_asset(replace(asset, metadata=metadata)) self.content_service().add_representation_from_bytes( asset_id, payload.get("kind", RepresentationKind.SOURCE.value), _cmis_media_type(payload.get("media_type", "text/plain")), payload.get("content", ""), context, expected_current_version_id=expected, metadata={"cmis": {"operation": "setContentStream"}}, ) return self.cmis_object(access_point_id, object_id, context) def cmis_append_content_stream( self, access_point_id: str, object_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.SET_CONTENT_STREAM, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "appendContentStream") asset_id = _cmis_asset_id(object_id) asset = self.repository.get_asset(asset_id) if not mapper.access_point.exposes_asset(asset, context): raise NotFoundError( "CMIS object not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) expected = ( payload.get("expected_current_version_id") or payload.get("changeToken") or payload.get("change_token") ) self._cmis_assert_change_token(asset, expected, operation="appendContentStream") appended = _cmis_payload_bytes(payload.get("content", b"")) is_last_chunk = _cmis_form_bool(payload.get("isLastChunk") or payload.get("is_last_chunk"), default=True) base = b"" kind = payload.get("kind", RepresentationKind.SOURCE.value) media_type = _cmis_media_type(payload.get("media_type", "application/octet-stream")) if self._cmis_asset_representations(asset): stream = self.content_service().get_content_stream(asset_id, context) base = stream.content kind = stream.representation.kind.value media_type = _cmis_media_type(payload.get("media_type") or stream.representation.media_type) elif asset.metadata.get("cmis_content_deleted"): metadata = dict(asset.metadata) metadata.pop("cmis_content_deleted", None) self.repository.save_asset(replace(asset, metadata=metadata)) composed_size = len(base) + len(appended) if composed_size > CMIS_APPEND_MAX_COMPOSED_BYTES: raise ValidationError( "CMIS append content stream exceeds the composed append limit", details={ "code": "cmis.append_content_limit_exceeded", "cmis_exception": "constraint", "operation": "appendContentStream", "max_size_bytes": CMIS_APPEND_MAX_COMPOSED_BYTES, "composed_size_bytes": composed_size, }, ) self.content_service().add_representation_from_bytes( asset_id, kind, media_type, base + appended, context, expected_current_version_id=expected, metadata={ "cmis": { "operation": "appendContentStream", "is_last_chunk": is_last_chunk, "appended_bytes": len(appended), } }, ) return self.cmis_object(access_point_id, object_id, context) def cmis_delete_content_stream( self, access_point_id: str, object_id: str, context: OperationContext, payload: dict[str, Any] | None = None, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.SET_CONTENT_STREAM, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "deleteContentStream") payload = payload or {} asset_id = _cmis_asset_id(object_id) asset = self.repository.get_asset(asset_id) if not mapper.access_point.exposes_asset(asset, context): raise NotFoundError( "CMIS object not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) expected = ( payload.get("expected_current_version_id") or payload.get("changeToken") or payload.get("change_token") ) self._cmis_assert_change_token(asset, expected, operation="deleteContentStream") asset = self._cmis_record_asset_version( asset, context, change_type=VersionChangeType.CONTENT_CHANGED, operation_id="cmis.deleteContentStream", metadata_delta={"cmis_content_deleted": True}, representation_ids=tuple(representation.representation_id for representation in self._cmis_asset_representations(asset)), ) self.repository.save_asset(replace(asset, metadata={**asset.metadata, "cmis_content_deleted": True})) return self.cmis_object(access_point_id, object_id, context) def cmis_move_object( self, access_point_id: str, object_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.MOVE_OBJECT, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "moveObject") target_folder_id = payload.get("targetFolderId") or payload.get("target_folder_id") if not target_folder_id: raise ValidationError("CMIS target folder id is required", details={"operation": "moveObject"}) target_path = _cmis_folder_path(str(target_folder_id)) or "/" if not self._cmis_folder_exists(mapper, context, target_path): raise NotFoundError( "CMIS target folder not found", details={"operation": "moveObject", "target_folder_id": target_folder_id}, ) source_folder_id = payload.get("sourceFolderId") or payload.get("source_folder_id") source_path = _cmis_folder_path(str(source_folder_id)) if source_folder_id else None if object_id.startswith("cmis:folder:"): return self._cmis_move_workspace_folder(mapper, object_id, target_path, source_path, context) asset_id = _cmis_asset_id(object_id) asset = self.repository.get_asset(asset_id) if not mapper.access_point.exposes_asset(asset, context): raise NotFoundError( "CMIS object not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) current_path = _normalize_cmis_path(str(asset.metadata.get("cmis_path") or mapper.asset_path(asset))) current_parent = _path_parent(current_path) if source_path and source_path != current_parent: raise ValidationError( "CMIS source folder does not match current object parent", details={ "operation": "moveObject", "source_folder_id": source_folder_id, "current_parent": current_parent, }, ) file_name = str(asset.metadata.get("file_name") or _path_name(current_path) or asset.title) new_path = _normalize_cmis_path(f"{target_path}/{file_name}") if new_path == current_path: return self.cmis_object(access_point_id, object_id, context) self._validate_cmis_path_available(mapper, context, new_path, excluding_asset_id=asset_id) metadata = { **asset.metadata, "cmis_path": new_path, "cmis_parent_folder_id": "cmis-root" if target_path == "/" else mapper.folder_object_id(target_path), "file_name": file_name, } self.repository.save_asset(replace(asset, metadata=metadata)) return self.cmis_object(access_point_id, object_id, context) def representation_content_stream( self, asset_id: str, representation_id: str, context: OperationContext, ): return self.content_service().stream_content( asset_id, context, representation_id=representation_id, ) def cmis_delete_object( self, access_point_id: str, object_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.DELETE_OBJECT, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "deleteObject") if object_id.startswith("cmis:folder:"): workspace_folder = self._cmis_workspace_folder_by_object_id(access_point_id, object_id) folder_path = workspace_folder.path if workspace_folder is not None else (_cmis_folder_path(object_id) or "/") if folder_path == "/": raise ValidationError("CMIS root folder cannot be deleted", details={"operation": "deleteObject"}) folders = self._cmis_workspace_folder_map(access_point_id) if folder_path not in folders: raise NotFoundError( "CMIS folder not found", details={"object_id": object_id, "access_point_id": access_point_id}, ) children = self._cmis_children_for_folder(mapper, context, folder_path=folder_path) if children: raise ValidationError( "CMIS folder is not empty", details={"operation": "deleteObject", "object_id": object_id, "child_count": len(children)}, ) del folders[folder_path] return { "object_id": object_id, "deleted": True, "lifecycle": LifecycleState.DELETE_REQUESTED.value, "profile": access_point_id, } asset_id = _cmis_asset_id(object_id) result = self.asset_service().request_delete( asset_id, context, expected_current_version_id=payload.get("expected_current_version_id"), ) return { "object_id": mapper.asset_object_id(asset_id), "deleted": False, "lifecycle": result.asset.lifecycle.value, "version": result.version.to_dict(), "audit_event": result.audit_event.to_dict(), "policy_decision": result.policy_decision.to_dict(), } def cmis_delete_tree( self, access_point_id: str, object_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.DELETE_TREE, context, resource=object_id) if not decision.allowed: raise _cmis_authorization_error(decision, "deleteTree") workspace_folder = self._cmis_workspace_folder_by_object_id(access_point_id, object_id) folder_path = workspace_folder.path if workspace_folder is not None else _cmis_folder_path(object_id) if folder_path in (None, "/"): raise ValidationError("CMIS root folder cannot be deleteTree target", details={"operation": "deleteTree"}) failed_to_delete: list[str] = [] deleted_assets = 0 for asset in list(self.repository.list_assets()): if not mapper.access_point.exposes_asset(asset, context): continue if not any(_path_contains(folder_path, path) for path in mapper.asset_paths(asset)): continue try: self.asset_service().request_delete( asset.id, context, expected_current_version_id=payload.get("expected_current_version_id"), ) deleted_assets += 1 except Exception: failed_to_delete.append(mapper.asset_object_id(asset.id)) folders = self._cmis_workspace_folder_map(access_point_id) folder_paths = [ path for path in folders if path == folder_path or _path_contains(folder_path, path) ] for path in sorted(folder_paths, key=lambda item: item.count("/"), reverse=True): del folders[path] return { "failedToDelete": failed_to_delete, "failed_to_delete": failed_to_delete, "deleted": len(folder_paths) + deleted_assets, "deleted_folders": len(folder_paths), "deleted_assets": deleted_assets, "profile": access_point_id, } def cmis_query( self, access_point_id: str, query: str, context: OperationContext, *, skip_count: int = 0, max_items: int = 100, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.QUERY, context) if not decision.allowed: raise _cmis_authorization_error(decision, "query") query_spec = _parse_cmis_query(query) projections = self._cmis_document_projections(mapper, context) projections = _apply_cmis_query_spec(projections, query_spec) paged = projections[max(skip_count, 0) : max(skip_count, 0) + max(max_items, 0)] return { "query": query, "query_spec": query_spec, "results": paged, "num_items": len(paged), "has_more_items": len(projections) > max(skip_count, 0) + len(paged), "total_num_items": len(projections), } def cmis_relationships( self, access_point_id: str, context: OperationContext, *, object_id: str | None = None, target_id: str | None = None, relationship_direction: str | None = None, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.GET_RELATIONSHIPS, context) if not decision.allowed: raise _cmis_authorization_error(decision, "getRelationships") direction = (relationship_direction or "source").strip().lower() if direction not in {"source", "target", "either"}: raise ValidationError( "Unsupported CMIS relationship direction", details={ "code": "cmis.relationship_direction_unsupported", "operation": "getObjectRelationships", "direction": relationship_direction, "supported": ["source", "target", "either"], }, ) source_filter = _cmis_asset_id(object_id) if object_id and direction == "source" else None target_filter = _cmis_asset_id(object_id) if object_id and direction == "target" else None if target_id: target_filter = _cmis_asset_id(target_id) relationships = self.repository.list_relationships(source_id=source_filter, target_id=target_filter) if object_id and direction == "either": asset_id = _cmis_asset_id(object_id) relationships = [ relationship for relationship in relationships if relationship.source_id == asset_id or ( relationship.target_kind == RelationshipTargetKind.ASSET and relationship.target_id == asset_id ) ] projections = [ projection.to_dict() for relationship in relationships if self._cmis_relationship_visible(mapper, relationship, context) if (projection := mapper.map_relationship(relationship, context)) ] return { "items": projections, "count": len(projections), "filters": { "object_id": object_id, "target_id": target_id, "relationship_direction": direction, }, } def cmis_change_log( self, access_point_id: str, context: OperationContext, *, skip_count: int = 0, max_items: int = 100, ) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) decision = mapper.access_point.decide_action(CMISAction.GET_CHANGE_LOG, context) if not decision.allowed: raise _cmis_authorization_error(decision, "getContentChanges") events = self.repository.list_audit_events() changes = [ { "change_id": event.event_id, "change_type": _cmis_change_type(event.operation), "object_id": event.target.replace("asset:", "cmis:asset:", 1), "change_time": event.occurred_at, "actor_id": event.actor_id, "correlation_id": event.correlation_id, } for event in events if event.target.startswith("asset:") if self._cmis_asset_visible(mapper, event.target.removeprefix("asset:"), context) ] paged = changes[max(skip_count, 0) : max(skip_count, 0) + max(max_items, 0)] return { "change_log_token": changes[-1]["change_id"] if changes else None, "changes": paged, "num_items": len(paged), "has_more_items": len(changes) > max(skip_count, 0) + len(paged), "total_num_items": len(changes), } def _cmis_mapper(self, access_point_id: str) -> CMISDomainMapper: return CMISDomainMapper(self._cmis_access_point(access_point_id)) def _cmis_access_point(self, access_point_id: str) -> CMISAccessPoint: for profile in _cmis_profiles(): if profile.name == access_point_id: return _cmis_access_point(profile) raise NotFoundError( "CMIS access point not found", details={"access_point_id": access_point_id, "available": [profile.name for profile in _cmis_profiles()]}, ) def _cmis_asset_visible( self, mapper: CMISDomainMapper, asset_id: str, context: OperationContext, ) -> bool: try: return mapper.access_point.exposes_asset(self.repository.get_asset(asset_id), context) except NotFoundError: return False def _cmis_relationship_visible( self, mapper: CMISDomainMapper, relationship: Any, context: OperationContext, ) -> bool: if not self._cmis_asset_visible(mapper, relationship.source_id, context): return False if relationship.target_kind == RelationshipTargetKind.ASSET: return self._cmis_asset_visible(mapper, relationship.target_id, context) return True def _cmis_children_for_folder( self, mapper: CMISDomainMapper, context: OperationContext, *, folder_path: str | None, ) -> list[dict[str, Any]]: assets = [ asset for asset in self.repository.list_assets() if mapper.access_point.exposes_asset(asset, context) ] access_point_id = mapper.access_point.access_point_id workspace_folders = self._cmis_workspace_folder_map(access_point_id) if folder_path in (None, "/"): child_folder_paths = set() for asset in assets: for path in mapper.asset_paths(asset): first = path.strip("/").split("/")[0] if first: child_folder_paths.add("/" + first) workspace_children = [ self._cmis_workspace_folder_projection(mapper, folder) for folder in workspace_folders.values() if _path_parent(folder.path) == "/" ] projection_children = [ mapper.folder_projection(path) for path in sorted(child_folder_paths) if path not in workspace_folders ] return sorted(workspace_children + projection_children, key=_cmis_child_order_key) children: list[dict[str, Any]] = [] folder_path = _normalize_cmis_path(folder_path) child_folder_paths: set[str] = set() for folder in workspace_folders.values(): if _path_parent(folder.path) == folder_path: children.append(self._cmis_workspace_folder_projection(mapper, folder)) for asset in assets: for path in mapper.asset_paths(asset): parent = _path_parent(path) if parent == folder_path: projection = mapper.map_asset( asset, context, representations=self._cmis_asset_representations(asset), versions=self.repository.list_versions(asset.id), relationship_ids=[ f"cmis:relationship:{relationship.relationship_id}" for relationship in self.repository.list_relationships(source_id=asset.id) if self._cmis_relationship_visible(mapper, relationship, context) ], metadata_records=self.repository.list_metadata_records(asset.id), ) if projection is not None: children.append(projection.to_dict()) elif _path_parent(parent) == folder_path: child_folder_paths.add(parent) projection_children = [ mapper.folder_projection(path) for path in sorted(child_folder_paths) if path not in workspace_folders ] return sorted(projection_children + children, key=_cmis_child_order_key) def _cmis_workspace_folder_map(self, access_point_id: str) -> dict[str, CMISWorkspaceFolder]: return self.cmis_workspace_folders.setdefault(access_point_id, {}) def _cmis_asset_representations(self, asset) -> list[AssetRepresentation]: if asset.metadata.get("cmis_content_deleted"): return [] return self.repository.list_representations(asset_id=asset.id) def _cmis_workspace_folder_by_object_id( self, access_point_id: str, object_id: str, ) -> CMISWorkspaceFolder | None: for folder in self._cmis_workspace_folder_map(access_point_id).values(): if folder.object_id == object_id: return folder return None def _cmis_folder_projection(self, access_point_id: str, folder_path: str) -> dict[str, Any]: mapper = self._cmis_mapper(access_point_id) normalized = _normalize_cmis_path(folder_path) if normalized == "/": return cmis_browser_root_folder(mapper.access_point) folder = self._cmis_workspace_folder_map(access_point_id).get(normalized) if folder is not None: return self._cmis_workspace_folder_projection(mapper, folder) return mapper.folder_projection(normalized) def _validate_cmis_path_available( self, mapper: CMISDomainMapper, context: OperationContext, path: str, *, excluding_asset_id: str | None = None, excluding_folder_path: str | None = None, ) -> None: normalized = _normalize_cmis_path(path) excluded_folder = _normalize_cmis_path(excluding_folder_path) if excluding_folder_path else None if normalized != excluded_folder and normalized in self._cmis_workspace_folder_map(mapper.access_point.access_point_id): raise ValidationError( "CMIS path already exists", details={"operation": "moveObject", "path": normalized, "kind": "folder"}, ) for asset in self.repository.list_assets(): if excluding_asset_id and asset.id == excluding_asset_id: continue if not mapper.access_point.exposes_asset(asset, context): continue if normalized in mapper.asset_paths(asset): raise ValidationError( "CMIS path already exists", details={"operation": "moveObject", "path": normalized, "kind": "document"}, ) def _cmis_update_workspace_folder( self, mapper: CMISDomainMapper, object_id: str, properties: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: folder = self._cmis_workspace_folder_by_object_id(mapper.access_point.access_point_id, object_id) if folder is None: folder_path = _cmis_folder_path(object_id) if folder_path in (None, "/"): raise ValidationError("CMIS root folder cannot be updated", details={"operation": "updateProperties"}) folder = self._cmis_workspace_folder_map(mapper.access_point.access_point_id).get(folder_path) if folder is None: raise NotFoundError( "CMIS folder not found", details={"operation": "updateProperties", "object_id": object_id}, ) supported = {"cmis:name", "cmis:description", "cmis:secondaryObjectTypeIds"} unsupported = sorted(key for key in properties if key.startswith("cmis:") and key not in supported) if unsupported: raise ValidationError( "Unsupported CMIS folder property update", details={"operation": "updateProperties", "unsupported": unsupported, "supported": sorted(supported)}, ) new_name = properties.get("cmis:name") if new_name is None or str(new_name) == folder.name: updated = replace(folder, updated_at=utc_now().isoformat()) self._cmis_workspace_folder_map(mapper.access_point.access_point_id)[folder.path] = updated return self._cmis_workspace_folder_projection(mapper, updated) name = str(new_name).strip() if not name: raise ValidationError("CMIS name cannot be empty", details={"operation": "updateProperties"}) old_path = folder.path parent_path = _path_parent(old_path) new_path = _normalize_cmis_path(f"{parent_path}/{name}") if new_path != old_path: self._validate_cmis_path_available( mapper, context, new_path, excluding_folder_path=old_path, ) folders = self._cmis_workspace_folder_map(mapper.access_point.access_point_id) moving = { path: value for path, value in folders.items() if path == old_path or _path_contains(old_path, path) } now = utc_now().isoformat() for path in sorted(moving, key=lambda item: item.count("/"), reverse=True): del folders[path] for path, value in sorted(moving.items(), key=lambda item: item[0].count("/")): suffix = path.removeprefix(old_path) moved_path = _normalize_cmis_path(f"{new_path}{suffix}") moved_parent = _path_parent(moved_path) folders[moved_path] = replace( value, path=moved_path, name=_path_name(moved_path), parent_id="cmis-root" if moved_parent == "/" else mapper.folder_object_id(moved_parent), updated_at=now, ) for asset in self.repository.list_assets(): explicit_path = asset.metadata.get("cmis_path") if not explicit_path: continue asset_path = _normalize_cmis_path(str(explicit_path)) if not _path_contains(old_path, asset_path): continue suffix = asset_path.removeprefix(old_path) moved_asset_path = _normalize_cmis_path(f"{new_path}{suffix}") moved_parent = _path_parent(moved_asset_path) self.repository.save_asset( replace( asset, metadata={ **asset.metadata, "cmis_path": moved_asset_path, "cmis_parent_folder_id": "cmis-root" if moved_parent == "/" else mapper.folder_object_id(moved_parent), }, ) ) return self._cmis_workspace_folder_projection(mapper, folders[new_path]) def _cmis_move_workspace_folder( self, mapper: CMISDomainMapper, object_id: str, target_path: str, source_path: str | None, context: OperationContext, ) -> dict[str, Any]: workspace_folder = self._cmis_workspace_folder_by_object_id(mapper.access_point.access_point_id, object_id) folder_path = workspace_folder.path if workspace_folder is not None else _cmis_folder_path(object_id) if folder_path in (None, "/"): raise ValidationError("CMIS root folder cannot be moved", details={"operation": "moveObject"}) folders = self._cmis_workspace_folder_map(mapper.access_point.access_point_id) folder = folders.get(folder_path) if folder is None: raise ValidationError( "Only adapter-managed CMIS workspace folders can be moved", details={"operation": "moveObject", "object_id": object_id}, ) current_parent = _path_parent(folder_path) if source_path and source_path != current_parent: raise ValidationError( "CMIS source folder does not match current object parent", details={ "operation": "moveObject", "source_folder_id": source_path, "current_parent": current_parent, }, ) new_path = _normalize_cmis_path(f"{target_path}/{folder.name}") if new_path == folder_path: return self._cmis_workspace_folder_projection(mapper, folder) if _path_contains(folder_path, target_path): raise ValidationError( "CMIS folder cannot be moved below itself", details={"operation": "moveObject", "object_id": object_id, "target_path": target_path}, ) self._validate_cmis_path_available(mapper, context, new_path, excluding_folder_path=folder_path) now = utc_now().isoformat() moving = { path: value for path, value in folders.items() if path == folder_path or _path_contains(folder_path, path) } for path in sorted(moving, key=lambda item: item.count("/"), reverse=True): del folders[path] for path, value in sorted(moving.items(), key=lambda item: item[0].count("/")): suffix = path.removeprefix(folder_path) moved_path = _normalize_cmis_path(f"{new_path}{suffix}") parent_path = _path_parent(moved_path) folders[moved_path] = replace( value, object_id=mapper.folder_object_id(moved_path), path=moved_path, name=_path_name(moved_path), parent_id="cmis-root" if parent_path == "/" else mapper.folder_object_id(parent_path), updated_at=now, ) for asset in self.repository.list_assets(): explicit_path = asset.metadata.get("cmis_path") if not explicit_path: continue asset_path = _normalize_cmis_path(str(explicit_path)) if not _path_contains(folder_path, asset_path): continue suffix = asset_path.removeprefix(folder_path) moved_asset_path = _normalize_cmis_path(f"{new_path}{suffix}") parent_path = _path_parent(moved_asset_path) self.repository.save_asset( replace( asset, metadata={ **asset.metadata, "cmis_path": moved_asset_path, "cmis_parent_folder_id": "cmis-root" if parent_path == "/" else mapper.folder_object_id(parent_path), }, ) ) return self._cmis_workspace_folder_projection(mapper, folders[new_path]) def _cmis_folder_exists( self, mapper: CMISDomainMapper, context: OperationContext, folder_path: str, ) -> bool: normalized = _normalize_cmis_path(folder_path) if normalized == "/": return True workspace_folders = self._cmis_workspace_folder_map(mapper.access_point.access_point_id) if normalized in workspace_folders: return True if any(_path_contains(normalized, folder.path) for folder in workspace_folders.values()): return True for asset in self.repository.list_assets(): if not mapper.access_point.exposes_asset(asset, context): continue if any(_path_contains(normalized, path) for path in mapper.asset_paths(asset)): return True return False def _cmis_workspace_folder_projection( self, mapper: CMISDomainMapper, folder: CMISWorkspaceFolder, ) -> dict[str, Any]: projection = mapper.folder_projection(folder.path) projection["object_id"] = folder.object_id projection["name"] = folder.name projection["path"] = folder.path projection["properties"].update( { "cmis:objectId": folder.object_id, "cmis:name": folder.name, "cmis:objectTypeId": CMISBaseType.FOLDER.value, "cmis:createdBy": folder.created_by, "cmis:lastModifiedBy": folder.created_by, "cmis:creationDate": folder.created_at, "cmis:lastModificationDate": folder.updated_at, "cmis:changeToken": f"folder:{folder.updated_at}", "cmis:parentId": folder.parent_id, "cmis:description": "Adapter-managed CMIS workspace folder", "kontextual:workspaceFolder": True, } ) actions = set(projection.get("allowable_actions", [])) actions.update( { CMISAction.GET_OBJECT.value, CMISAction.GET_CHILDREN.value, CMISAction.GET_OBJECT_PARENTS.value, } ) if mapper.access_point.profile.allow_mutations: actions.update( { CMISAction.CREATE_DOCUMENT.value, CMISAction.CREATE_FOLDER.value, CMISAction.DELETE_OBJECT.value, CMISAction.DELETE_TREE.value, } ) projection["allowable_actions"] = sorted(actions) return projection def _cmis_document_projections( self, mapper: CMISDomainMapper, context: OperationContext, ) -> list[dict[str, Any]]: projections = [] for asset in self.repository.list_assets(): projection = mapper.map_asset( asset, context, representations=self._cmis_asset_representations(asset), versions=self.repository.list_versions(asset.id), relationship_ids=[ f"cmis:relationship:{relationship.relationship_id}" for relationship in self.repository.list_relationships(source_id=asset.id) if self._cmis_relationship_visible(mapper, relationship, context) ], metadata_records=self.repository.list_metadata_records(asset.id), ) if projection is not None: projections.append(projection.to_dict()) return projections def create_asset(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: classification = Classification.from_dict(payload["classification"]) result = self.asset_service().create_asset( payload["title"], classification, context, asset_id=payload.get("asset_id"), source_refs=[_source_reference(item) for item in payload.get("source_refs", ())], metadata_records=[_metadata_record(item) for item in payload.get("metadata_records", ())], idempotency_key=payload.get("idempotency_key"), ) return _asset_change_result(result) def get_asset(self, asset_id: str) -> dict[str, Any]: return self.asset_service().get_asset(asset_id).to_dict() def list_assets( self, *, lifecycle: str | None = None, asset_type: str | None = None, sensitivity: str | None = None, owner: str | None = None, topic: str | None = None, review_state: str | None = None, ) -> dict[str, Any]: assets = self.asset_service().list_assets( lifecycle=LifecycleState(lifecycle) if lifecycle else None, asset_type=asset_type, sensitivity=sensitivity, owner=owner, topic=topic, review_state=review_state, ) return {"items": [asset.to_dict() for asset in assets], "count": len(assets)} def add_metadata_record( self, asset_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: result = self.asset_service().add_metadata_record( asset_id, _metadata_record(payload), context, expected_current_version_id=payload.get("expected_current_version_id"), ) return _asset_change_result(result) def list_metadata_records(self, asset_id: str) -> dict[str, Any]: records = self.repository.list_metadata_records(asset_id) return {"items": [record.to_dict() for record in records], "count": len(records)} def transition_lifecycle( self, asset_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: result = self.asset_service().transition_lifecycle( asset_id, LifecycleState(payload["lifecycle"]), context, expected_current_version_id=payload.get("expected_current_version_id"), ) return _asset_change_result(result) def create_relationship(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: target_kind = RelationshipTargetKind(payload.get("target_kind", RelationshipTargetKind.ASSET.value)) service = self.asset_service() if target_kind == RelationshipTargetKind.CONTEXT_ENTITY: entity_payload = payload.get("context_entity") or {} entity = ContextEntity( entity_id=payload["target_id"], entity_type=ContextEntityType(entity_payload.get("entity_type", ContextEntityType.TOPIC.value)), name=entity_payload.get("name", payload["target_id"]), external_ref=entity_payload.get("external_ref"), metadata=dict(entity_payload.get("metadata", {})), ) result = service.link_asset_to_context_entity( payload["source_asset_id"], entity, payload["predicate"], context, confidence=payload.get("confidence"), provenance=dict(payload.get("provenance", {})), expected_current_version_id=payload.get("expected_current_version_id"), ) else: result = service.link_asset_to_asset( payload["source_asset_id"], payload["target_id"], payload["predicate"], context, confidence=payload.get("confidence"), provenance=dict(payload.get("provenance", {})), expected_current_version_id=payload.get("expected_current_version_id"), ) return { "relationship": result.relationship.to_dict(), "version": result.version.to_dict(), "audit_event": result.audit_event.to_dict(), "policy_decision": result.policy_decision.to_dict(), } def list_relationships( self, *, source_id: str | None = None, target_id: str | None = None, ) -> dict[str, Any]: relationships = self.repository.list_relationships(source_id=source_id, target_id=target_id) return { "items": [relationship.to_dict() for relationship in relationships], "count": len(relationships), } def list_audit_events( self, *, target: str | None = None, correlation_id: str | None = None, ) -> dict[str, Any]: events = self.repository.list_audit_events(target=target, correlation_id=correlation_id) return {"items": [event.to_dict() for event in events], "count": len(events)} def evaluate_policy(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: try: decision = self.policy_gateway.authorize( context, payload["action"], payload["resource"], resource_metadata=dict(payload.get("resource_metadata", {})), ) except Exception as exc: decision = PolicyDecision.fail_closed( context.actor.id, payload.get("action", "unknown"), payload.get("resource", "unknown"), reason=str(exc), context={"gateway_error": type(exc).__name__}, ) return decision.to_dict() def ingestion_capabilities(self) -> dict[str, Any]: service = self.ingestion_service() return { "connectors": service.connector_capabilities(), "extractors": service.extractor_capabilities(), } def start_ingestion_job(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: service = self.ingestion_service() mode = payload.get("mode", "file") classification = _optional_classification(payload.get("classification")) identity_policy = payload.get("identity_policy", IngestionIdentityPolicy.SOURCE_LOCATION.value) if mode == "directory": job = service.ingest_directory( payload["path"], context, recursive=bool(payload.get("recursive", True)), classification=classification, identity_policy=identity_policy, skip_unchanged=bool(payload.get("skip_unchanged", True)), ) return _ingestion_job_envelope(job) if mode != "file": raise ValidationError( "Unsupported ingestion mode", details={"mode": mode, "supported": ["file", "directory"]}, ) result = service.ingest_file( payload["path"], context, asset_id=payload.get("asset_id"), title=payload.get("title"), classification=classification, idempotency_key=payload.get("idempotency_key"), identity_policy=identity_policy, skip_unchanged=bool(payload.get("skip_unchanged", True)), ) return _ingestion_result_envelope(result) def get_ingestion_job(self, job_id: str) -> dict[str, Any]: return _ingestion_job_envelope(self.ingestion_service().get_job(job_id)) def list_ingestion_jobs(self, *, status: str | None = None) -> dict[str, Any]: parsed_status = _enum_filter(IngestionJobStatus, status, "ingestion job status") jobs = self.ingestion_service().list_jobs(status=parsed_status) return {"items": [_ingestion_job_envelope(job) for job in jobs], "count": len(jobs)} def refresh_retrieval_index(self) -> dict[str, Any]: return self.retrieval_service().refresh_index().to_dict() def query_assets(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: service = self.retrieval_service() service.refresh_index() return service.query_assets(_asset_query_request(payload), context).to_dict() def query_context_entities(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: return self.retrieval_service().query_context_entities( _context_entity_query_request(payload), context, ).to_dict() def query_relationships(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: return self.retrieval_service().query_relationships( _relationship_query_request(payload), context, ).to_dict() def record_retrieval_feedback(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: return self.retrieval_service().record_feedback( RetrievalFeedbackRequest( label=payload["label"], query=dict(payload.get("query", {})), result_ref=dict(payload.get("result_ref", {})), notes=payload.get("notes"), metadata=dict(payload.get("metadata", {})), ), context, ).to_dict() def list_retrieval_feedback( self, *, correlation_id: str | None = None, label: str | None = None, ) -> dict[str, Any]: parsed_label = _enum_filter(RetrievalFeedbackLabel, label, "retrieval feedback label") records = self.retrieval_service().list_feedback(correlation_id=correlation_id, label=parsed_label) return {"items": [record.to_dict() for record in records], "count": len(records)} def retrieval_quality_metrics(self) -> dict[str, Any]: return self.retrieval_service().quality_metrics().to_dict() def list_transformation_operations(self) -> dict[str, Any]: operations = self.transformation_service().list_operations() return {"items": [operation.to_dict() for operation in operations], "count": len(operations)} def execute_transformation(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: result = self.transformation_service().execute_transformation( _transformation_request(payload), context, ) return _transformation_result_envelope(result) def get_transformation_run(self, run_id: str) -> dict[str, Any]: return _transformation_run_envelope(self.transformation_service().get_run(run_id)) def list_transformation_runs( self, *, status: str | None = None, operation_id: str | None = None, ) -> dict[str, Any]: parsed_status = _enum_filter(TransformationRunStatus, status, "transformation run status") runs = self.transformation_service().list_runs(status=parsed_status, operation_id=operation_id) return {"items": [_transformation_run_envelope(run) for run in runs], "count": len(runs)} def retry_transformation_run(self, run_id: str, context: OperationContext) -> dict[str, Any]: return _transformation_result_envelope(self.transformation_service().retry_run(run_id, context)) def cancel_transformation_run( self, run_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: run = self.transformation_service().cancel_run(run_id, context, reason=payload.get("reason")) return _transformation_run_envelope(run) def register_workflow_template(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: return self.workflow_service().register_template(_workflow_template(payload), context).to_dict() def get_workflow_template(self, template_id: str, *, version: str | None = None) -> dict[str, Any]: return self.workflow_service().get_template(template_id, version=version).to_dict() def list_workflow_templates(self, *, template_id: str | None = None) -> dict[str, Any]: templates = self.workflow_service().list_templates(template_id=template_id) return {"items": [template.to_dict() for template in templates], "count": len(templates)} def queue_workflow_run(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: return _workflow_result_envelope( self.workflow_service().queue_template(_workflow_invocation(payload), context) ) def invoke_workflow_run(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: return _workflow_result_envelope( self.workflow_service().invoke_template(_workflow_invocation(payload), context) ) def get_workflow_run(self, run_id: str) -> dict[str, Any]: return _workflow_run_envelope(self.repository.get_workflow_run(run_id)) def list_workflow_runs( self, *, status: str | None = None, template_id: str | None = None, ) -> dict[str, Any]: parsed_status = _enum_filter(WorkflowRunStatus, status, "workflow run status") runs = self.repository.list_workflow_runs(status=parsed_status, template_id=template_id) return {"items": [_workflow_run_envelope(run) for run in runs], "count": len(runs)} def resume_workflow_run(self, run_id: str, context: OperationContext) -> dict[str, Any]: return _workflow_result_envelope(self.workflow_service().resume_run(run_id, context)) def retry_workflow_run(self, run_id: str, context: OperationContext) -> dict[str, Any]: return _workflow_result_envelope(self.workflow_service().retry_run(run_id, context)) def cancel_workflow_run( self, run_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: run = self.workflow_service().cancel_run(run_id, context, reason=payload.get("reason")) return _workflow_run_envelope(run) def reconstruct_workflow_run(self, run_id: str) -> dict[str, Any]: return self.workflow_service().reconstruct_run(run_id).to_dict() def list_workflow_review_tasks( self, *, status: str | None = WorkflowReviewStatus.OPEN.value, workflow_run_id: str | None = None, ) -> dict[str, Any]: parsed_status = _enum_filter(WorkflowReviewStatus, status, "workflow review status") reviews = self.workflow_service().list_review_tasks( status=parsed_status, workflow_run_id=workflow_run_id, ) return {"items": [review.to_dict() for review in reviews], "count": len(reviews)} def list_workflow_exceptions( self, *, status: str | None = WorkflowExceptionStatus.OPEN.value, kind: str | None = None, workflow_run_id: str | None = None, ) -> dict[str, Any]: parsed_status = _enum_filter(WorkflowExceptionStatus, status, "workflow exception status") parsed_kind = _enum_filter(WorkflowExceptionKind, kind, "workflow exception kind") exceptions = self.workflow_service().list_exception_queue( status=parsed_status, kind=parsed_kind, workflow_run_id=workflow_run_id, ) return {"items": [exception.to_dict() for exception in exceptions], "count": len(exceptions)} def record_workflow_review_decision( self, run_id: str, review_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: decision = payload.get("decision", WorkflowReviewDecisionType.CONTINUE.value) return _workflow_result_envelope( self.workflow_service().record_review_decision( run_id, review_id, decision, context, note=payload.get("note", ""), correction=dict(payload.get("correction", {})), ) ) def list_agent_operations(self) -> dict[str, Any]: return {"items": [dict(item) for item in AGENT_OPERATION_CATALOG], "count": len(AGENT_OPERATION_CATALOG)} def get_agent_operation(self, operation_id: str) -> dict[str, Any]: return dict(_agent_operation(operation_id)) def execute_agent_operation( self, operation_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: operation = _agent_operation(operation_id) dry_run = bool(payload.get("dry_run", False)) operation_payload = dict(payload.get("payload", payload)) operation_payload.pop("dry_run", None) decision = self._authorize_agent_operation(operation, operation_payload, context) effect = decision.effect if effect == PolicyEffect.REQUIRE_REVIEW: event = self._audit_agent_operation( operation, AuditOutcome.REVIEW_REQUIRED, context, decision, details={"phase": "review_required", "payload_keys": sorted(operation_payload)}, ) return _agent_review_required_envelope(operation, decision, event, context) if effect == PolicyEffect.DRY_RUN_ONLY and not dry_run: event = self._audit_agent_operation( operation, AuditOutcome.DRY_RUN, context, decision, details={"phase": "dry_run_required", "payload_keys": sorted(operation_payload)}, ) return _agent_dry_run_required_envelope(operation, decision, event, context) queued_event = self._audit_agent_operation( operation, AuditOutcome.DRY_RUN if dry_run else AuditOutcome.SUCCESS, context, decision, details={ "phase": "accepted", "dry_run": dry_run, "payload_keys": sorted(operation_payload), }, ) if dry_run: return { "operation_id": operation_id, "dry_run": True, "success": True, "would_execute": operation, "correlation_id": context.correlation_id, "policy_decision": decision.to_dict(), "audit_event": queued_event.to_dict(), } try: result = self._dispatch_agent_operation(operation_id, operation_payload, context) except Exception as exc: failed_event = self._audit_agent_operation( operation, AuditOutcome.FAILED, context, decision, details={ "phase": "failed", "error_type": type(exc).__name__, "payload_keys": sorted(operation_payload), }, ) if isinstance(exc, KontextualError): exc.details.setdefault("agent_audit_event_id", failed_event.event_id) raise completed_event = self._audit_agent_operation( operation, AuditOutcome.SUCCESS, context, decision, details={ "phase": "completed", "result_keys": sorted(result) if isinstance(result, dict) else [], }, ) return { "operation_id": operation_id, "dry_run": False, "success": True, "correlation_id": context.correlation_id, "result": result, "policy_decision": decision.to_dict(), "audit_event": completed_event.to_dict(), } def _dispatch_agent_operation( self, operation_id: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: if operation_id == "inspect_asset": return self.get_asset(payload["asset_id"]) if operation_id == "retrieve_asset": return self._asset_bundle(payload["asset_id"]) if operation_id == "search_assets": return self.query_assets(dict(payload.get("query", {})), context) if operation_id == "assemble_context": query_result = self.query_assets(dict(payload.get("query", {})), context) return { "context_preview": { "intent": payload.get("intent", "Support a bounded agent task with source-grounded context."), "instructions": payload.get("instructions", ""), "constraints": dict(payload.get("constraints", {})), "correlation_id": query_result["correlation_id"], "source_grounded": True, "items": query_result["results"], "result_count": query_result["result_count"], "total": query_result["total"], } } if operation_id == "enrich_metadata": return self.add_metadata_record(payload["asset_id"], dict(payload["metadata"]), context) if operation_id == "classify_asset": request = { "operation_id": "classify", "source_asset_ids": [payload["asset_id"]], "parameters": dict(payload.get("parameters", {})), "metadata": dict(payload.get("metadata", {})), } return self.execute_transformation(request, context) if operation_id == "transform_asset": return self.execute_transformation(dict(payload["transformation"]), context) if operation_id == "invoke_workflow": return self.invoke_workflow_run(dict(payload["workflow"]), context) if operation_id == "submit_review": return self.record_workflow_review_decision( payload["run_id"], payload["review_id"], payload, context, ) if operation_id == "report_result": return self._agent_report(payload, context) raise ValidationError("Unsupported agent operation", details={"operation_id": operation_id}) def _authorize_agent_operation( self, operation: dict[str, Any], payload: dict[str, Any], context: OperationContext, ) -> PolicyDecision: action = f"agent.operation.{operation['operation_id']}" resource = f"agent_operation:{operation['operation_id']}" try: decision = self.policy_gateway.authorize( context, action, resource, resource_metadata={ "operation": operation, "payload_keys": sorted(payload), "dry_run_supported": operation["dry_run_supported"], }, ) except Exception as exc: decision = PolicyDecision.fail_closed( context.actor.id, action, resource, reason=str(exc) or "Agent operation policy gateway failed", context={"gateway_error": type(exc).__name__}, ) if not decision.allowed and decision.effect not in ( PolicyEffect.REQUIRE_REVIEW, PolicyEffect.DRY_RUN_ONLY, ): event = self._audit_agent_operation(operation, AuditOutcome.DENIED, context, decision) raise AuthorizationError( "Operation denied by policy", details={ "action": action, "resource": resource, "correlation_id": context.correlation_id, "agent_audit_event_id": event.event_id, "policy_decision": decision.to_dict(), }, ) return decision def _audit_agent_operation( self, operation: dict[str, Any], outcome: AuditOutcome, context: OperationContext, policy_decision: PolicyDecision, *, details: dict[str, Any] | None = None, ) -> AuditEvent: event = AuditEvent.from_context( operation["audit_operation"], f"agent_operation:{operation['operation_id']}", outcome, context, policy_decision=policy_decision, details=details, ) return self.repository.save_audit_event(event) def operational_metrics(self) -> dict[str, Any]: assets = self.repository.list_assets() ingestion_jobs = self.repository.list_ingestion_jobs() transformation_runs = self.repository.list_transformation_runs() workflow_runs = self.repository.list_workflow_runs() audit_events = self.repository.list_audit_events() retrieval_events = [event for event in audit_events if event.operation.startswith("retrieval.")] query_latencies = [ float(event.details["permission_filter_duration_ms"]) for event in retrieval_events if "permission_filter_duration_ms" in event.details ] failed_jobs = [job for job in ingestion_jobs if job.status == IngestionJobStatus.FAILED] failed_transformations = [run for run in transformation_runs if run.status == TransformationRunStatus.FAILED] failed_workflows = [run for run in workflow_runs if run.status == WorkflowRunStatus.FAILED] queue_ages = [ _age_seconds(job.created_at, job.completed_at or utc_now().isoformat()) for job in ingestion_jobs if job.status in (IngestionJobStatus.QUEUED, IngestionJobStatus.RUNNING) ] return { "generated_at": utc_now().isoformat(), "repository": type(self.repository).__name__, "assets": { "count": len(assets), "representations": len(self.repository.list_representations()), "relationships": len(self.repository.list_relationships()), "context_entities": len(self.repository.list_context_entities()), }, "ingestion": { "job_count": len(ingestion_jobs), "completed": _count_by_value(job.status.value for job in ingestion_jobs).get("completed", 0), "failed": len(failed_jobs), "partial": _count_by_value(job.status.value for job in ingestion_jobs).get("partially_completed", 0), "throughput_assets": sum(len(job.output_asset_ids) for job in ingestion_jobs), "failure_rate": _ratio(len(failed_jobs), len(ingestion_jobs)), }, "retrieval": { "query_events": len(retrieval_events), "average_permission_filter_duration_ms": _average(query_latencies), "quality": self.retrieval_quality_metrics(), }, "transformations": { "run_count": len(transformation_runs), "completed": _count_by_value(run.status.value for run in transformation_runs).get("completed", 0), "failed": len(failed_transformations), "failure_rate": _ratio(len(failed_transformations), len(transformation_runs)), }, "workflows": { "run_count": len(workflow_runs), "completed": _count_by_value(run.status.value for run in workflow_runs).get("completed", 0), "failed": len(failed_workflows), "waiting": _count_by_value(run.status.value for run in workflow_runs).get("waiting", 0), "failure_rate": _ratio(len(failed_workflows), len(workflow_runs)), }, "permissions": { "policy_events": len([event for event in audit_events if event.policy_decision is not None]), "denied_events": len([event for event in audit_events if event.outcome == AuditOutcome.DENIED]), "review_required_events": len( [event for event in audit_events if event.outcome == AuditOutcome.REVIEW_REQUIRED] ), }, "service": { "started_at": self.started_at, "uptime_seconds": _age_seconds(self.started_at, utc_now().isoformat()), "api_latency_observation_count": 0, }, "storage_index_health": self.readiness()["checks"], "queue_age_seconds": { "max": max(queue_ages) if queue_ages else 0.0, "average": _average(queue_ages), }, } def inspect_jobs( self, *, kind: str | None = None, status: str | None = None, correlation_id: str | None = None, ) -> dict[str, Any]: items: list[dict[str, Any]] = [] if kind in (None, "ingestion"): parsed = _enum_filter(IngestionJobStatus, status, "ingestion job status") if status else None for job in self.repository.list_ingestion_jobs(status=parsed): if correlation_id is None or job.correlation_id == correlation_id: items.append({"kind": "ingestion", **_ingestion_job_envelope(job)}) if kind in (None, "transformation"): parsed = _enum_filter(TransformationRunStatus, status, "transformation run status") if status else None for run in self.repository.list_transformation_runs(status=parsed): if correlation_id is None or run.correlation_id == correlation_id: items.append({"kind": "transformation", "run": _transformation_run_envelope(run)}) if kind in (None, "workflow"): parsed = _enum_filter(WorkflowRunStatus, status, "workflow run status") if status else None for run in self.repository.list_workflow_runs(status=parsed): if correlation_id is None or run.correlation_id == correlation_id: items.append({"kind": "workflow", "run": _workflow_run_envelope(run)}) return {"items": items, "count": len(items)} def operational_events( self, *, correlation_id: str | None = None, operation_prefix: str | None = None, ) -> dict[str, Any]: events = self.repository.list_audit_events(correlation_id=correlation_id) if operation_prefix: events = [event for event in events if event.operation.startswith(operation_prefix)] return { "items": [ { "event_id": event.event_id, "operation": event.operation, "target": event.target, "outcome": event.outcome.value, "actor_id": event.actor_id, "correlation_id": event.correlation_id, "occurred_at": event.occurred_at, "details": dict(event.details), } for event in events ], "count": len(events), } def recovery_actions(self) -> dict[str, Any]: actions = [ { "action": "retry_ingestion_job", "target": "ingestion_job", "required": ["job_id"], "permission": "operations.recovery.retry_ingestion_job", }, { "action": "retry_transformation_run", "target": "transformation_run", "required": ["run_id"], "permission": "operations.recovery.retry_transformation_run", }, { "action": "cancel_transformation_run", "target": "transformation_run", "required": ["run_id"], "permission": "operations.recovery.cancel_transformation_run", }, { "action": "retry_workflow_run", "target": "workflow_run", "required": ["run_id"], "permission": "operations.recovery.retry_workflow_run", }, { "action": "cancel_workflow_run", "target": "workflow_run", "required": ["run_id"], "permission": "operations.recovery.cancel_workflow_run", }, { "action": "refresh_retrieval_index", "target": "retrieval_index", "required": [], "permission": "operations.recovery.refresh_retrieval_index", }, { "action": "inspect_failure", "target": "job_or_run", "required": ["kind", "id"], "permission": "operations.recovery.inspect_failure", }, ] return {"items": actions, "count": len(actions)} def execute_recovery_action( self, action: str, payload: dict[str, Any], context: OperationContext, ) -> dict[str, Any]: decision = self._authorize_operator_action(f"operations.recovery.{action}", f"recovery:{action}", payload, context) if action == "retry_ingestion_job": job = self.repository.get_ingestion_job(payload["job_id"]) result = self.start_ingestion_job(_ingestion_retry_payload(job), context) elif action == "retry_transformation_run": result = self.retry_transformation_run(payload["run_id"], context) elif action == "cancel_transformation_run": result = self.cancel_transformation_run(payload["run_id"], payload, context) elif action == "retry_workflow_run": result = self.retry_workflow_run(payload["run_id"], context) elif action == "cancel_workflow_run": result = self.cancel_workflow_run(payload["run_id"], payload, context) elif action == "refresh_retrieval_index": result = self.refresh_retrieval_index() elif action == "inspect_failure": result = self._inspect_failure(payload) else: raise ValidationError( "Unsupported recovery action", details={"action": action, "supported": [item["action"] for item in self.recovery_actions()["items"]]}, ) event = self._audit_operator_action(f"operations.recovery.{action}", f"recovery:{action}", context, decision) return { "action": action, "success": True, "correlation_id": context.correlation_id, "result": result, "policy_decision": decision.to_dict(), "audit_event": event.to_dict(), } def create_export_package(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: asset_ids = self._export_asset_ids(payload, context) decision = self._authorize_operator_action( "export.package.create", "export_package:new", {"asset_ids": asset_ids, "scope": dict(payload.get("scope", {}))}, context, ) records = [_export_asset_bundle(self.repository, asset_id) for asset_id in asset_ids] audit_events = [ event.to_dict() for asset_id in asset_ids for event in self.repository.list_audit_events(target=f"asset:{asset_id}") ] package = { "kind": "kontextual.export_package", "schema_version": "1", "package_id": payload.get("package_id") or new_id("export"), "created_at": utc_now().isoformat(), "actor": context.actor.to_dict(), "correlation_id": context.correlation_id, "scope": dict(payload.get("scope", {})), "policy_context": decision.to_dict(), "records": records, "audit_refs": audit_events, "adapter_sections": _export_adapter_sections(records), } package["manifest"] = _export_manifest(package) event = self._audit_operator_action( "export.package.create", f"export_package:{package['package_id']}", context, decision, details={"asset_count": len(asset_ids), "export_hash": package["manifest"]["export_hash"]}, ) package["audit_event"] = event.to_dict() return package def validate_export_package(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: package = dict(payload.get("package", payload)) decision = self._authorize_operator_action( "export.package.validate", f"export_package:{package.get('package_id', 'unknown')}", {"package_id": package.get("package_id")}, context, ) expected = package.get("manifest", {}) actual = _export_manifest({key: value for key, value in package.items() if key != "manifest"}) issues = [] for key in ("asset_count", "metadata_count", "representation_count", "relationship_count", "version_count"): if expected.get(key) != actual.get(key): issues.append({"code": "export.count_mismatch", "field": key, "expected": expected.get(key), "actual": actual.get(key)}) if expected.get("export_hash") != actual.get("export_hash"): issues.append( { "code": "export.integrity_mismatch", "field": "export_hash", "expected": expected.get("export_hash"), "actual": actual.get("export_hash"), } ) event = self._audit_operator_action( "export.package.validate", f"export_package:{package.get('package_id', 'unknown')}", context, decision, details={"valid": not issues, "issue_count": len(issues)}, ) return { "valid": not issues, "issues": issues, "expected_manifest": expected, "actual_manifest": actual, "policy_decision": decision.to_dict(), "audit_event": event.to_dict(), } def governance_report(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: asset_ids = self._export_asset_ids(payload, context) if payload else [asset.id for asset in self.repository.list_assets()] decision = self._authorize_operator_action("governance.report.generate", "governance:report", payload, context) findings: list[dict[str, Any]] = [] for asset_id in asset_ids: asset = self.repository.get_asset(asset_id) metadata = self.repository.list_metadata_records(asset_id) if not asset.classification.owner: findings.append({"asset_id": asset_id, "code": "governance.owner_missing", "severity": "warning"}) if not metadata: findings.append({"asset_id": asset_id, "code": "governance.metadata_missing", "severity": "warning"}) if not asset.source_refs: findings.append({"asset_id": asset_id, "code": "governance.source_ref_missing", "severity": "error"}) if asset.classification.sensitivity.value in {"confidential", "restricted"}: has_review = any(record.key in {"review_state", "legal_hold", "retention"} for record in metadata) if not has_review: findings.append({"asset_id": asset_id, "code": "governance.sensitive_without_review_metadata", "severity": "warning"}) if not self.repository.list_audit_events(target=f"asset:{asset_id}"): findings.append({"asset_id": asset_id, "code": "governance.audit_missing", "severity": "error"}) event = self._audit_operator_action( "governance.report.generate", "governance:report", context, decision, details={"asset_count": len(asset_ids), "finding_count": len(findings)}, ) return { "generated_at": utc_now().isoformat(), "scope": {"asset_ids": asset_ids}, "summary": _count_by_value(finding["code"] for finding in findings), "findings": findings, "redaction": {"policy_enforced": True, "content_included": False}, "policy_decision": decision.to_dict(), "audit_event": event.to_dict(), } def extension_catalog(self) -> dict[str, Any]: ingestion = self.ingestion_capabilities() return { "source_connectors": ingestion["connectors"], "extractors": ingestion["extractors"], "transformations": self.list_transformation_operations()["items"], "event_types": _extension_event_types(), "backend_abstractions": [ "asset_registry_repository", "policy_gateway", "source_connector", "format_extractor", "transformation_operation_registry", "event_publisher", "search_index", "ai_model_adapter", ], "markitect_boundary": "Markdown parsing, selectors, contracts, snapshots, and markdown context-package rendering stay delegated to markitect-tool adapters.", } def emit_extension_event(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: event_type = payload["event_type"] if event_type not in _extension_event_types(): raise ValidationError( "Unsupported extension event type", details={"event_type": event_type, "supported": _extension_event_types()}, ) decision = self._authorize_operator_action( "extension.event.emit", f"extension_event:{event_type}", payload, context, ) event = self._audit_operator_action( f"extension.{event_type}", payload.get("target", f"extension_event:{event_type}"), context, decision, details={"payload": dict(payload.get("payload", {})), "metadata": dict(payload.get("metadata", {}))}, ) return {"event": event.to_dict(), "policy_decision": decision.to_dict()} def record_quality_signal(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: signal_type = payload["signal_type"] target = payload.get("target", f"quality_signal:{signal_type}") decision = self._authorize_operator_action("quality.signal.record", target, payload, context) event = self._audit_operator_action( "quality.signal.recorded", target, context, decision, details={ "signal_type": signal_type, "asset_id": payload.get("asset_id"), "workflow_run_id": payload.get("workflow_run_id"), "agent_id": payload.get("agent_id"), "application_id": payload.get("application_id"), "metrics": dict(payload.get("metrics", {})), "ai_usage": dict(payload.get("ai_usage", {})), "cost": dict(payload.get("cost", {})), }, ) return {"event": event.to_dict(), "policy_decision": decision.to_dict()} def quality_cost_signals(self) -> dict[str, Any]: events = [ event for event in self.repository.list_audit_events() if event.operation in {"quality.signal.recorded", "agent.report.recorded"} ] ai_usage = [event.details.get("ai_usage", {}) for event in events if event.details.get("ai_usage")] costs = [event.details.get("cost", {}) for event in events if event.details.get("cost")] return { "retrieval": self.retrieval_quality_metrics(), "signal_count": len(events), "ai_usage": { "observation_count": len(ai_usage), "tokens": sum(int(item.get("tokens", 0)) for item in ai_usage), "provider_errors": sum(1 for item in ai_usage if item.get("error")), }, "cost": { "observation_count": len(costs), "estimated_total": sum(float(item.get("estimated", 0.0)) for item in costs), "currency": costs[0].get("currency") if costs else None, }, "attribution_dimensions": ["asset_id", "workflow_run_id", "agent_id", "application_id", "actor_id"], } def performance_smoke_report(self) -> dict[str, Any]: metrics = self.operational_metrics() return { "generated_at": utc_now().isoformat(), "smoke_targets": ["ingestion", "retrieval", "workflow", "export"], "measurements": { "ingestion_jobs": metrics["ingestion"]["job_count"], "retrieval_query_events": metrics["retrieval"]["query_events"], "workflow_runs": metrics["workflows"]["run_count"], "export_events": len( [event for event in self.repository.list_audit_events() if event.operation.startswith("export.")] ), }, "history_note": "Longitudinal pytest performance history is captured by tests/conftest.py.", } def mvp_compliance_report(self) -> dict[str, Any]: implemented = { "asset_registry": self.repository.list_assets is not None, "ingestion_jobs": hasattr(self.repository, "list_ingestion_jobs"), "governed_retrieval": True, "transformations": True, "workflow_jobs": True, "service_api": True, "agent_operations": True, "context_packages": True, "observability": True, "exports": True, "governance_reporting": True, } return { "generated_at": utc_now().isoformat(), "perspective": "V0.2 MVP acceptance", "requirements": [ {"requirement": "FR-200..FR-207 observability and recovery", "status": "implemented"}, {"requirement": "FR-220..FR-225 export and portability", "status": "implemented"}, {"requirement": "FR-120..FR-132 governance and audit", "status": "implemented"}, {"requirement": "FR-160..FR-188 agent-safe service operation", "status": "implemented"}, {"requirement": "P1/P2 enterprise adapters", "status": "explicitly_deferred"}, ], "implemented_capabilities": implemented, "remaining_gaps": [ "External webhook delivery adapters are represented as event contracts, not network emitters.", "Provider-backed AI cost depends on adapters supplying usage metadata.", "API request latency needs middleware instrumentation when FastAPI service runtime is deployed.", ], } def _export_asset_ids(self, payload: dict[str, Any], context: OperationContext) -> list[str]: scope = dict(payload.get("scope", payload)) if scope.get("asset_ids"): return list(dict.fromkeys(str(item) for item in scope["asset_ids"])) if scope.get("asset_id"): return [str(scope["asset_id"])] if scope.get("query"): result = self.query_assets(dict(scope["query"]), context) return [item["asset_id"] for item in result.get("results", ())] assets = self.repository.list_assets( lifecycle=LifecycleState(scope["lifecycle"]) if scope.get("lifecycle") else None, asset_type=scope.get("asset_type"), sensitivity=scope.get("sensitivity"), owner=scope.get("owner"), topic=scope.get("topic"), ) return [asset.id for asset in assets] def _inspect_failure(self, payload: dict[str, Any]) -> dict[str, Any]: kind = payload["kind"] identifier = payload["id"] if kind == "ingestion": return _ingestion_job_envelope(self.repository.get_ingestion_job(identifier)) if kind == "transformation": return _transformation_run_envelope(self.repository.get_transformation_run(identifier)) if kind == "workflow": return _workflow_run_envelope(self.repository.get_workflow_run(identifier)) raise ValidationError("Unsupported failure inspection kind", details={"kind": kind}) def _authorize_operator_action( self, action: str, resource: str, payload: dict[str, Any], context: OperationContext, ) -> PolicyDecision: try: decision = self.policy_gateway.authorize( context, action, resource, resource_metadata={"payload_keys": sorted(payload)}, ) except Exception as exc: decision = PolicyDecision.fail_closed( context.actor.id, action, resource, reason=str(exc) or "Operator policy gateway failed", context={"gateway_error": type(exc).__name__}, ) if not decision.allowed: event = self._audit_operator_action(action, resource, context, decision, outcome=AuditOutcome.DENIED) raise AuthorizationError( "Operation denied by policy", details={ "action": action, "resource": resource, "correlation_id": context.correlation_id, "audit_event_id": event.event_id, "policy_decision": decision.to_dict(), }, ) return decision def _audit_operator_action( self, operation: str, target: str, context: OperationContext, decision: PolicyDecision, *, outcome: AuditOutcome = AuditOutcome.SUCCESS, details: dict[str, Any] | None = None, ) -> AuditEvent: event = AuditEvent.from_context( operation, target, outcome, context, policy_decision=decision, details=details, ) return self.repository.save_audit_event(event) def _asset_bundle(self, asset_id: str) -> dict[str, Any]: asset = self.repository.get_asset(asset_id) metadata_records = self.repository.list_metadata_records(asset_id) representations = self.repository.list_representations(asset_id=asset_id) relationships = self.repository.list_relationships(source_id=asset_id) return { "asset": asset.to_dict(), "metadata_records": [record.to_dict() for record in metadata_records], "representations": [representation.to_dict() for representation in representations], "relationships": [relationship.to_dict() for relationship in relationships], "source_grounded": bool(asset.source_refs or representations), } def _agent_report(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: if not payload.get("summary"): raise ValidationError("Agent result report requires a summary", details={"required": ["summary"]}) decision = PolicyDecision.allow( context.actor.id, "agent.operation.report_result.record", f"agent:{context.actor.id}", context={"correlation_id": context.correlation_id}, ) event = AuditEvent.from_context( "agent.report.recorded", f"agent:{context.actor.id}", AuditOutcome.SUCCESS, context, policy_decision=decision, details={ "summary": payload["summary"], "result_ref": dict(payload.get("result_ref", {})), "metadata": dict(payload.get("metadata", {})), }, ) saved = self.repository.save_audit_event(event) return { "summary": payload["summary"], "result_ref": dict(payload.get("result_ref", {})), "metadata": dict(payload.get("metadata", {})), "audit_event": saved.to_dict(), } def context_package_schema(self) -> dict[str, Any]: return { "kind": "kontextual.context_package", "version": "1", "required": ["query"], "optional": [ "package_id", "title", "intent", "instructions", "constraints", "external_memory_refs", "metadata", "format", ], "formats": ["kontextual", "markitect"], "source_grounding": ["source_refs", "representations", "snippets", "relationships", "metadata_records"], "memory_boundary": "external_memory_refs are opaque pointers; memory graph contents are not embedded", } def assemble_context_package(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: query = dict(payload.get("query", {})) query.setdefault("include_snippets", True) query.setdefault("include_relationships", True) query.setdefault("max_snippets", 5) constraints = dict(payload.get("constraints", {})) decision = self._authorize_context_package(query, constraints, context) query_result = self.query_assets(query, context) package = _context_package_payload( payload, query_result, context, constraints=constraints, ) event = self._audit_context_package(package, context, decision) package["audit_event"] = event.to_dict() package["policy_decision"] = decision.to_dict() return package def _authorize_context_package( self, query: dict[str, Any], constraints: dict[str, Any], context: OperationContext, ) -> PolicyDecision: try: decision = self.policy_gateway.authorize( context, "context_package.assemble", "context_package:new", resource_metadata={ "query": query, "constraints": constraints, "external_memory_refs": "opaque", }, ) except Exception as exc: decision = PolicyDecision.fail_closed( context.actor.id, "context_package.assemble", "context_package:new", reason=str(exc) or "Context package policy gateway failed", context={"gateway_error": type(exc).__name__}, ) if not decision.allowed: event = AuditEvent.from_context( "context_package.assemble", "context_package:new", AuditOutcome.DENIED, context, policy_decision=decision, details={"query": query, "constraints": constraints}, ) saved = self.repository.save_audit_event(event) raise AuthorizationError( "Operation denied by policy", details={ "action": "context_package.assemble", "resource": "context_package:new", "correlation_id": context.correlation_id, "audit_event_id": saved.event_id, "policy_decision": decision.to_dict(), }, ) return decision def _audit_context_package( self, package: dict[str, Any], context: OperationContext, decision: PolicyDecision, ) -> AuditEvent: event = AuditEvent.from_context( "context_package.assemble", f"context_package:{package['package_id']}", AuditOutcome.SUCCESS, context, policy_decision=decision, details={ "result_count": package["result_count"], "source_grounded": package["source_grounded"], "format": package["format"], "external_memory_ref_count": len(package["external_memory_refs"]), }, ) return self.repository.save_audit_event(event) def create_app(runtime: ServiceRuntime | None = None): try: from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request from fastapi.responses import JSONResponse, StreamingResponse except ImportError as exc: # pragma: no cover - exercised when optional extra is absent raise RuntimeError( "FastAPI service dependencies are not installed. Install kontextual-engine[service]." ) from exc globals()["Request"] = Request globals()["StreamingResponse"] = StreamingResponse runtime = runtime or ServiceRuntime() app = FastAPI( title="Kontextual Engine Service API", version=OPENAPI_VERSION, openapi_url="/openapi.json", docs_url="/docs", redoc_url="/redoc", ) app.state.kontextual_runtime = runtime def _is_cmis_request(request: Request) -> bool: return str(request.url.path).startswith("/cmis/") def _cmis_error_response(status_code: int, payload: dict[str, Any]) -> JSONResponse: details = dict(payload.get("details", {})) if isinstance(payload.get("details"), dict) else {} message = str(payload.get("message") or payload.get("detail") or "CMIS request failed") code = str(details.get("code") or payload.get("code") or "kontextual.cmis") cmis_exception = details.get("cmis_exception") resolved_status = status_code if not cmis_exception: lowered = message.lower() if code == "kontextual.not_found" or status_code == 404: cmis_exception = "objectNotFound" resolved_status = 404 elif code == "kontextual.authorization" or status_code == 403: cmis_exception = "permissionDenied" resolved_status = 403 elif code == "asset.version_conflict": cmis_exception = "updateConflict" resolved_status = 409 elif code == "cmis.not_supported" or lowered.startswith("unsupported cmis browser binding action"): cmis_exception = "notSupported" resolved_status = 405 elif "path already exists" in lowered or "cannot be moved" in lowered: cmis_exception = "constraint" resolved_status = 409 else: cmis_exception = "invalidArgument" resolved_status = 400 elif cmis_exception == "objectNotFound": resolved_status = 404 elif cmis_exception == "permissionDenied": resolved_status = 403 elif cmis_exception == "updateConflict": resolved_status = 409 elif cmis_exception == "notSupported": resolved_status = 405 elif cmis_exception == "constraint": resolved_status = 409 elif status_code == 422: resolved_status = 400 content = { "exception": cmis_exception, "message": message, "code": code, "details": details, } return JSONResponse(status_code=resolved_status, content=content) @app.exception_handler(NotFoundError) async def not_found_error_handler(_request, exc: NotFoundError) -> JSONResponse: if _is_cmis_request(_request): return _cmis_error_response(404, _error_payload(exc)) return JSONResponse(status_code=404, content=_error_payload(exc)) @app.exception_handler(AuthorizationError) async def authorization_error_handler(_request, exc: AuthorizationError) -> JSONResponse: if _is_cmis_request(_request): return _cmis_error_response(403, _authorization_error_payload(exc)) return JSONResponse(status_code=403, content=_authorization_error_payload(exc)) @app.exception_handler(ValidationError) async def validation_error_handler(_request, exc: ValidationError) -> JSONResponse: if _is_cmis_request(_request): return _cmis_error_response(422, _error_payload(exc)) return JSONResponse(status_code=422, content=_error_payload(exc)) @app.exception_handler(KontextualError) async def kontextual_error_handler(_request, exc: KontextualError) -> JSONResponse: if _is_cmis_request(_request): return _cmis_error_response(400, _error_payload(exc)) return JSONResponse(status_code=400, content=_error_payload(exc)) @app.exception_handler(HTTPException) async def http_exception_handler(_request, exc: HTTPException) -> JSONResponse: if _is_cmis_request(_request): detail = exc.detail if isinstance(exc.detail, dict) else {"message": str(exc.detail)} return _cmis_error_response(exc.status_code, detail) return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail}) @app.get("/health", tags=["system"]) def health() -> dict[str, Any]: return runtime.health() @app.get("/ready", tags=["system"]) def ready() -> dict[str, Any]: return runtime.readiness() @app.get("/version", tags=["system"]) def version() -> dict[str, Any]: return runtime.version() prefix = f"/api/{runtime.api_version}" @app.get(f"{prefix}/health", tags=["system"]) def versioned_health() -> dict[str, Any]: return runtime.health() @app.get(f"{prefix}/ready", tags=["system"]) def versioned_ready() -> dict[str, Any]: return runtime.readiness() @app.get(f"{prefix}/version", tags=["system"]) def versioned_version() -> dict[str, Any]: return runtime.version() def context_from_headers( x_actor_id: str | None = Header(None), x_actor_type: str | None = Header(None), x_actor_display_name: str | None = Header(None), x_actor_external_ref: str | None = Header(None), x_actor_groups: str | None = Header(None), x_correlation_id: str | None = Header(None), x_delegated_actor_id: str | None = Header(None), x_delegated_actor_type: str | None = Header(None), x_delegated_actor_display_name: str | None = Header(None), x_delegated_actor_external_ref: str | None = Header(None), x_delegated_actor_groups: str | None = Header(None), x_agent_id: str | None = Header(None), x_agent_name: str | None = Header(None), x_agent_run_id: str | None = Header(None), x_agent_tool: str | None = Header(None), x_request_scope: str | None = Header(None), x_policy_scope: str | None = Header(None), ) -> OperationContext: actor_id = x_actor_id or x_agent_id or "api-user" actor_type = x_actor_type or ("ai_agent" if x_agent_id else "human") return runtime.operation_context( actor_id=actor_id, actor_type=actor_type, display_name=x_actor_display_name, external_ref=x_actor_external_ref, correlation_id=x_correlation_id, groups=_split_header_list(x_actor_groups), delegated_actor_id=x_delegated_actor_id, delegated_actor_type=x_delegated_actor_type or "human", delegated_actor_display_name=x_delegated_actor_display_name, delegated_actor_external_ref=x_delegated_actor_external_ref, delegated_actor_groups=_split_header_list(x_delegated_actor_groups), request_scope=_json_header(x_request_scope, "X-Request-Scope"), policy_scope=_json_header(x_policy_scope, "X-Policy-Scope"), agent_id=x_agent_id, agent_name=x_agent_name, agent_run_id=x_agent_run_id, agent_tool=x_agent_tool, ) def response(callable_obj, *args: Any, **kwargs: Any) -> Any: try: return callable_obj(*args, **kwargs) except NotFoundError as exc: raise HTTPException(status_code=404, detail=_error_payload(exc)) from exc except AuthorizationError as exc: raise HTTPException(status_code=403, detail=_authorization_error_payload(exc)) from exc except ValidationError as exc: raise HTTPException(status_code=422, detail=_error_payload(exc)) from exc except KontextualError as exc: raise HTTPException(status_code=400, detail=_error_payload(exc)) from exc except (KeyError, TypeError, ValueError) as exc: raise HTTPException( status_code=422, detail={ "code": "kontextual.validation", "message": "Invalid request payload", "details": {"error_type": type(exc).__name__, "message": str(exc)}, }, ) from exc @app.get(f"{prefix}/context", tags=["context"]) def current_context( context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return context.to_dict() @app.get("/cmis", tags=["cmis"]) def cmis_access_points() -> dict[str, Any]: return response(runtime.cmis_access_points) def browser_urls(request: Request, access_point_id: str) -> tuple[str, str]: return ( str(request.url_for("cmis_browser_entry", access_point_id=access_point_id)), str(request.url_for("cmis_browser_root", access_point_id=access_point_id)), ) def browser_include(value: bool | None, *, default: bool = True) -> bool: return default if value is None else value def browser_content_response( result, *, offset: int | None = None, length: int | None = None, range_header: str | None = None, ) -> StreamingResponse: representation = result.representation if range_header and offset is None and length is None and range_header.startswith("bytes="): range_spec = range_header.removeprefix("bytes=").split(",", 1)[0] start_text, _, end_text = range_spec.partition("-") if start_text.strip().isdigit(): offset = int(start_text) if end_text.strip().isdigit(): end = int(end_text) length = max(end - offset + 1, 0) start = max(offset or 0, 0) requested_length = None if length is None else max(length, 0) is_partial = start > 0 or requested_length is not None content_length = max(representation.size_bytes - start, 0) if requested_length is not None: content_length = min(content_length, requested_length) def chunks(): skip = start remaining = requested_length for chunk in result.chunks: if skip: if len(chunk) <= skip: skip -= len(chunk) continue chunk = chunk[skip:] skip = 0 if remaining is None: yield chunk continue if remaining <= 0: break part = chunk[:remaining] remaining -= len(part) if part: yield part if remaining <= 0: break headers = { "Content-Length": str(content_length), "ETag": representation.digest, "X-Kontextual-Representation-Id": representation.representation_id, "X-Kontextual-Storage-Ref": representation.storage_ref or "", } if representation.media_type: headers["Content-Type"] = representation.media_type if is_partial: end = start + content_length - 1 if content_length else start headers["Content-Range"] = f"bytes {start}-{end}/{representation.size_bytes}" return StreamingResponse( chunks(), status_code=206 if is_partial else 200, headers=headers, ) def unsupported_browser_selector(selector: str | None) -> dict[str, Any]: unsupported_details: dict[str, Any] = { "cmisselector": selector, "supported": [ "repositoryInfo", "typeChildren", "typeDescendants", "typeDefinition", "query", "object", "children", "parent", "parents", "properties", "allowableActions", "policies", "content", ], } if selector in {"descendants", "folderTree"}: unsupported_details.update( { "code": "cmis.not_supported", "cmis_exception": "notSupported", "unsupported_feature": "get_descendants" if selector == "descendants" else "get_folder_tree", "release_contract": "Navigation tree selectors remain unsupported for the first release.", } ) raise ValidationError( "Unsupported CMIS Browser Binding selector", details=unsupported_details, ) async def browser_action_payload(request: Request) -> dict[str, Any]: payload: dict[str, Any] = dict(request.query_params) content_type = request.headers.get("content-type", "") content_type_lower = content_type.lower() body = await request.body() if body: if "application/json" in content_type_lower: payload.update(json.loads(body.decode("utf-8"))) elif "multipart/form-data" in content_type_lower: form_values, file_values = _parse_multipart_form(content_type, body) payload.update(_flatten_form_values(form_values)) for field_name, file_value in file_values.items(): payload[field_name] = file_value["content"] if field_name in {"content", "contentStream", "file"} or "content" not in payload: payload["content"] = file_value["content"] payload.setdefault( "media_type", _cmis_media_type(file_value.get("content_type") or "application/octet-stream"), ) payload.setdefault("content_filename", file_value.get("filename")) else: parsed = parse_qs(body.decode("utf-8"), keep_blank_values=True) payload.update(_flatten_form_values(parsed)) properties = _cmis_browser_properties(payload) if properties: payload["properties"] = properties payload.setdefault("name", properties.get("cmis:name")) payload.setdefault("type_id", properties.get("cmis:objectTypeId")) return payload async def cmis_browser_post_action( access_point_id: str, request: Request, *, default_object_id: str | None, context: OperationContext, ) -> Any: payload = await browser_action_payload(request) action = payload.get("cmisaction") or payload.get("cmisAction") or payload.get("action") if action is None: type_id = payload.get("type_id") or payload.get("properties", {}).get("cmis:objectTypeId") if type_id in {CMISBaseType.DOCUMENT.value, "kontextual:document"}: action = "createDocument" elif type_id in {CMISBaseType.FOLDER.value, "kontextual:folder"}: action = "createFolder" object_id = payload.get("objectId") or payload.get("object_id") or default_object_id if action == "createFolder": return response( runtime.cmis_browser_create_folder, access_point_id, payload, context, parent_folder_id=object_id or payload.get("folderId") or payload.get("folder_id") or "cmis-root", ) if action == "createDocument": return response( runtime.cmis_browser_create_document, access_point_id, payload, context, parent_folder_id=object_id or payload.get("folderId") or payload.get("folder_id") or "cmis-root", ) if action in {"createDocumentFromSource", "copy"}: return response( runtime.cmis_browser_create_document_from_source, access_point_id, payload, context, parent_folder_id=object_id or payload.get("folderId") or payload.get("folder_id") or "cmis-root", ) if action in {"delete", "deleteObject"}: if not object_id: raise ValidationError("CMIS object id is required", details={"operation": "deleteObject"}) return response(runtime.cmis_delete_object, access_point_id, object_id, payload, context) if action == "deleteTree": if not object_id: raise ValidationError("CMIS object id is required", details={"operation": "deleteTree"}) return response(runtime.cmis_delete_tree, access_point_id, object_id, payload, context) if action in {"updateProperties", "update"}: if not object_id: raise ValidationError("CMIS object id is required", details={"operation": "updateProperties"}) response(runtime.cmis_update_properties, access_point_id, object_id, payload, context) return response(runtime.cmis_browser_object, access_point_id, object_id, context) if action == "move": if not object_id: raise ValidationError("CMIS object id is required", details={"operation": "moveObject"}) moved = response(runtime.cmis_move_object, access_point_id, object_id, payload, context) return cmis_browser_object(moved) if action in {"setContentStream", "setContent"}: if not object_id: raise ValidationError("CMIS object id is required", details={"operation": "setContentStream"}) response(runtime.cmis_set_content_stream, access_point_id, object_id, payload, context) return response(runtime.cmis_browser_object, access_point_id, object_id, context) if action in {"appendContentStream", "appendContent"}: if not object_id: raise ValidationError("CMIS object id is required", details={"operation": "appendContentStream"}) response(runtime.cmis_append_content_stream, access_point_id, object_id, payload, context) return response(runtime.cmis_browser_object, access_point_id, object_id, context) if action in {"deleteContent", "deleteContentStream"}: if not object_id: raise ValidationError("CMIS object id is required", details={"operation": "deleteContentStream"}) response(runtime.cmis_delete_content_stream, access_point_id, object_id, context, payload) return response(runtime.cmis_browser_object, access_point_id, object_id, context) if action in {"bulkUpdateProperties", "bulkUpdate"}: return response(runtime.cmis_bulk_update_properties, access_point_id, payload, context) raise ValidationError( "Unsupported CMIS Browser Binding action", details={ "cmisaction": action, "supported": [ "createFolder", "createDocument", "createDocumentFromSource", "delete", "deleteObject", "deleteTree", "updateProperties", "update", "move", "setContentStream", "setContent", "appendContentStream", "appendContent", "deleteContent", "deleteContentStream", ], }, ) @app.get("/cmis/{access_point_id}/browser", tags=["cmis"]) def cmis_browser_entry( access_point_id: str, request: Request, cmisselector: str | None = Query(None), typeId: str | None = Query(None), includePropertyDefinitions: bool = Query(False), q: str | None = Query(None), skipCount: int = Query(0), maxItems: int = Query(100), context: OperationContext = Depends(context_from_headers), ) -> Any: repository_url, root_folder_url = browser_urls(request, access_point_id) if cmisselector in (None, "", "repositoryInfo"): return response( runtime.cmis_browser_service_document, access_point_id, repository_url=repository_url, root_folder_url=root_folder_url, ) if cmisselector == "typeChildren": return response( runtime.cmis_browser_type_children, access_point_id, type_id=typeId, skip_count=skipCount, max_items=maxItems, include_property_definitions=includePropertyDefinitions, ) if cmisselector == "typeDescendants": return response( runtime.cmis_browser_type_descendants, access_point_id, type_id=typeId, include_property_definitions=includePropertyDefinitions, ) if cmisselector == "typeDefinition": return response(runtime.cmis_browser_type_definition, access_point_id, type_id=typeId) if cmisselector == "query": return response( runtime.cmis_browser_query, access_point_id, q or "SELECT * FROM cmis:document", context, skip_count=skipCount, max_items=maxItems, ) return unsupported_browser_selector(cmisselector) @app.post("/cmis/{access_point_id}/browser", tags=["cmis"]) async def cmis_browser_entry_action( access_point_id: str, request: Request, objectId: str | None = Query(None), context: OperationContext = Depends(context_from_headers), ) -> Any: return await cmis_browser_post_action( access_point_id, request, default_object_id=objectId, context=context, ) @app.get("/cmis/{access_point_id}/browser/root", tags=["cmis"]) def cmis_browser_root( access_point_id: str, cmisselector: str | None = Query(None), objectId: str | None = Query(None), path: str | None = Query(None), propertyFilter: str | None = Query(None, alias="filter"), includeAllowableActions: bool | None = Query(None), includeACL: bool | None = Query(None), includePathSegment: bool | None = Query(None), includeRelativePathSegment: bool | None = Query(None), offset: int | None = Query(None), length: int | None = Query(None), range_header: str | None = Header(None, alias="Range"), skipCount: int = Query(0), maxItems: int = Query(100), context: OperationContext = Depends(context_from_headers), ) -> Any: if cmisselector in (None, "", "object"): if path and not objectId: return response( runtime.cmis_browser_object_by_path, access_point_id, path, context, property_filter=propertyFilter, include_allowable_actions=browser_include(includeAllowableActions), include_acl=browser_include(includeACL, default=False), ) return response( runtime.cmis_browser_object, access_point_id, objectId, context, property_filter=propertyFilter, include_allowable_actions=browser_include(includeAllowableActions), include_acl=browser_include(includeACL, default=False), ) if cmisselector == "children": return response( runtime.cmis_browser_children, access_point_id, context, object_id=objectId, skip_count=skipCount, max_items=maxItems, property_filter=propertyFilter, include_allowable_actions=browser_include(includeAllowableActions), include_acl=browser_include(includeACL, default=False), include_path_segment=browser_include(includePathSegment), ) if cmisselector == "parent": if not objectId: return unsupported_browser_selector(cmisselector) return response(runtime.cmis_browser_parent, access_point_id, objectId, context) if cmisselector == "parents": if not objectId: return [] return response( runtime.cmis_browser_parents, access_point_id, objectId, context, include_relative_path_segment=browser_include(includeRelativePathSegment), ) if cmisselector == "properties": if path and not objectId: return response( runtime.cmis_browser_object_by_path, access_point_id, path, context, property_filter=propertyFilter, include_allowable_actions=False, include_acl=False, )["properties"] return response( runtime.cmis_browser_object, access_point_id, objectId, context, property_filter=propertyFilter, include_allowable_actions=False, include_acl=False, )["properties"] if cmisselector == "allowableActions": if path and not objectId: return response( runtime.cmis_browser_object_by_path, access_point_id, path, context, include_allowable_actions=True, )["allowableActions"] return response( runtime.cmis_browser_object, access_point_id, objectId, context, include_allowable_actions=True, )["allowableActions"] if cmisselector == "policies": return [] if cmisselector == "content": if not objectId: return unsupported_browser_selector(cmisselector) result = response(runtime.cmis_content_stream_bytes, access_point_id, objectId, context) return browser_content_response(result, offset=offset, length=length, range_header=range_header) return unsupported_browser_selector(cmisselector) @app.get("/cmis/{access_point_id}/browser/root/{object_path:path}", tags=["cmis"]) def cmis_browser_root_path( access_point_id: str, object_path: str, cmisselector: str | None = Query(None), objectId: str | None = Query(None), propertyFilter: str | None = Query(None, alias="filter"), includeAllowableActions: bool | None = Query(None), includeACL: bool | None = Query(None), includePathSegment: bool | None = Query(None), includeRelativePathSegment: bool | None = Query(None), offset: int | None = Query(None), length: int | None = Query(None), range_header: str | None = Header(None, alias="Range"), skipCount: int = Query(0), maxItems: int = Query(100), context: OperationContext = Depends(context_from_headers), ) -> Any: path = _normalize_cmis_path(object_path) browser_object: dict[str, Any] | None = None resolution_object: dict[str, Any] | None = None def object_by_path() -> dict[str, Any]: nonlocal browser_object if browser_object is None: browser_object = response( runtime.cmis_browser_object_by_path, access_point_id, path, context, property_filter=propertyFilter, include_allowable_actions=browser_include(includeAllowableActions), include_acl=browser_include(includeACL, default=False), ) return browser_object def object_for_resolution() -> dict[str, Any]: nonlocal resolution_object if resolution_object is None: resolution_object = response( runtime.cmis_browser_object_by_path, access_point_id, path, context, include_allowable_actions=False, include_acl=False, ) return resolution_object def resolved_object_id() -> str | None: if objectId: return objectId object_id_property = object_for_resolution().get("properties", {}).get("cmis:objectId", {}) if isinstance(object_id_property, dict): return object_id_property.get("value") return None if cmisselector in (None, "", "object"): return object_by_path() if cmisselector == "children": return response( runtime.cmis_browser_children, access_point_id, context, object_id=resolved_object_id(), skip_count=skipCount, max_items=maxItems, property_filter=propertyFilter, include_allowable_actions=browser_include(includeAllowableActions), include_acl=browser_include(includeACL, default=False), include_path_segment=browser_include(includePathSegment), ) if cmisselector == "parent": object_id = resolved_object_id() if not object_id: return unsupported_browser_selector(cmisselector) return response(runtime.cmis_browser_parent, access_point_id, object_id, context) if cmisselector == "parents": object_id = resolved_object_id() if not object_id: return [] return response( runtime.cmis_browser_parents, access_point_id, object_id, context, include_relative_path_segment=browser_include(includeRelativePathSegment), ) if cmisselector == "properties": return object_by_path()["properties"] if cmisselector == "allowableActions": return response( runtime.cmis_browser_object_by_path, access_point_id, path, context, include_allowable_actions=True, )["allowableActions"] if cmisselector == "policies": return [] if cmisselector == "content": object_id = resolved_object_id() if not object_id: return unsupported_browser_selector(cmisselector) result = response(runtime.cmis_content_stream_bytes, access_point_id, object_id, context) return browser_content_response(result, offset=offset, length=length, range_header=range_header) return unsupported_browser_selector(cmisselector) @app.post("/cmis/{access_point_id}/browser/root", tags=["cmis"]) async def cmis_browser_root_action( access_point_id: str, request: Request, objectId: str | None = Query(None), context: OperationContext = Depends(context_from_headers), ) -> Any: return await cmis_browser_post_action( access_point_id, request, default_object_id=objectId or "cmis-root", context=context, ) @app.post("/cmis/{access_point_id}/browser/root/{object_path:path}", tags=["cmis"]) async def cmis_browser_root_path_action( access_point_id: str, object_path: str, request: Request, objectId: str | None = Query(None), context: OperationContext = Depends(context_from_headers), ) -> Any: default_object_id = objectId if not default_object_id: browser_object = response( runtime.cmis_browser_object_by_path, access_point_id, _normalize_cmis_path(object_path), context, ) object_id_property = browser_object.get("properties", {}).get("cmis:objectId", {}) if isinstance(object_id_property, dict): default_object_id = object_id_property.get("value") return await cmis_browser_post_action( access_point_id, request, default_object_id=default_object_id, context=context, ) @app.get("/cmis/{access_point_id}/browser/types", tags=["cmis"]) def cmis_types(access_point_id: str) -> dict[str, Any]: return response(runtime.cmis_type_definitions, access_point_id) @app.get("/cmis/{access_point_id}/browser/children", tags=["cmis"]) def cmis_children( access_point_id: str, folder_id: str | None = Query(None), skip_count: int = Query(0), max_items: int = Query(100), context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response( runtime.cmis_children, access_point_id, context, folder_id=folder_id, skip_count=skip_count, max_items=max_items, ) @app.get("/cmis/{access_point_id}/browser/object/{object_id:path}", tags=["cmis"]) def cmis_object( access_point_id: str, object_id: str, context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.cmis_object, access_point_id, object_id, context) @app.get("/cmis/{access_point_id}/browser/content/{object_id:path}", tags=["cmis"]) def cmis_content_stream( access_point_id: str, object_id: str, context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.cmis_content_stream, access_point_id, object_id, context) @app.get("/cmis/{access_point_id}/browser/content-bytes/{object_id:path}", tags=["cmis"]) def cmis_content_stream_bytes( access_point_id: str, object_id: str, offset: int | None = Query(None), length: int | None = Query(None), range_header: str | None = Header(None, alias="Range"), context: OperationContext = Depends(context_from_headers), ) -> Any: result = response(runtime.cmis_content_stream_bytes, access_point_id, object_id, context) return browser_content_response(result, offset=offset, length=length, range_header=range_header) @app.get("/cmis/{access_point_id}/browser/acl/{object_id:path}", tags=["cmis"]) def cmis_acl( access_point_id: str, object_id: str, context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.cmis_acl, access_point_id, object_id, context) @app.get("/cmis/{access_point_id}/browser/parents/{object_id:path}", tags=["cmis"]) def cmis_object_parents( access_point_id: str, object_id: str, context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.cmis_object_parents, access_point_id, object_id, context) @app.post("/cmis/{access_point_id}/browser/document", tags=["cmis"]) def cmis_create_document( access_point_id: str, payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.cmis_create_document, access_point_id, payload, context) @app.post("/cmis/{access_point_id}/browser/folder", tags=["cmis"]) def cmis_create_folder( access_point_id: str, payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.cmis_create_folder, access_point_id, payload, context) @app.post("/cmis/{access_point_id}/browser/object/{object_id:path}/properties", tags=["cmis"]) def cmis_update_properties( access_point_id: str, object_id: str, payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.cmis_update_properties, access_point_id, object_id, payload, context) @app.post("/cmis/{access_point_id}/browser/object/{object_id:path}/content", tags=["cmis"]) def cmis_set_content_stream( access_point_id: str, object_id: str, payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.cmis_set_content_stream, access_point_id, object_id, payload, context) @app.post("/cmis/{access_point_id}/browser/object/{object_id:path}/delete", tags=["cmis"]) def cmis_delete_object( access_point_id: str, object_id: str, payload: dict[str, Any] | None = None, context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.cmis_delete_object, access_point_id, object_id, payload or {}, context) @app.get("/cmis/{access_point_id}/browser/query", tags=["cmis"]) def cmis_query( access_point_id: str, q: str = Query("SELECT * FROM cmis:document"), skip_count: int = Query(0), max_items: int = Query(100), context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response( runtime.cmis_query, access_point_id, q, context, skip_count=skip_count, max_items=max_items, ) @app.get("/cmis/{access_point_id}/browser/relationships", tags=["cmis"]) def cmis_relationships( access_point_id: str, object_id: str | None = Query(None), target_id: str | None = Query(None, alias="targetId"), relationship_direction: str | None = Query(None, alias="relationshipDirection"), context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response( runtime.cmis_relationships, access_point_id, context, object_id=object_id, target_id=target_id, relationship_direction=relationship_direction, ) @app.get("/cmis/{access_point_id}/browser/changes", tags=["cmis"]) def cmis_changes( access_point_id: str, skip_count: int = Query(0), max_items: int = Query(100), context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response( runtime.cmis_change_log, access_point_id, context, skip_count=skip_count, max_items=max_items, ) @app.post(f"{prefix}/assets", tags=["assets"]) def create_asset( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.create_asset, payload, context) @app.get(f"{prefix}/assets", tags=["assets"]) def list_assets( lifecycle: str | None = Query(None), asset_type: str | None = Query(None), sensitivity: str | None = Query(None), owner: str | None = Query(None), topic: str | None = Query(None), review_state: str | None = Query(None), ) -> dict[str, Any]: return response( runtime.list_assets, lifecycle=lifecycle, asset_type=asset_type, sensitivity=sensitivity, owner=owner, topic=topic, review_state=review_state, ) @app.get(f"{prefix}/assets/{{asset_id}}", tags=["assets"]) def get_asset(asset_id: str) -> dict[str, Any]: return response(runtime.get_asset, asset_id) @app.get(f"{prefix}/assets/{{asset_id}}/representations/{{representation_id}}/content", tags=["assets"]) def get_representation_content( asset_id: str, representation_id: str, context: OperationContext = Depends(context_from_headers), ) -> Any: result = response(runtime.representation_content_stream, asset_id, representation_id, context) representation = result.representation return StreamingResponse( result.chunks, media_type=representation.media_type, headers={ "Content-Length": str(representation.size_bytes), "ETag": representation.digest, "X-Kontextual-Representation-Id": representation.representation_id, "X-Kontextual-Storage-Ref": representation.storage_ref or "", }, ) @app.post(f"{prefix}/assets/{{asset_id}}/metadata", tags=["metadata"]) def add_metadata( asset_id: str, payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.add_metadata_record, asset_id, payload, context) @app.get(f"{prefix}/assets/{{asset_id}}/metadata", tags=["metadata"]) def list_metadata(asset_id: str) -> dict[str, Any]: return response(runtime.list_metadata_records, asset_id) @app.post(f"{prefix}/assets/{{asset_id}}/lifecycle", tags=["assets"]) def transition_lifecycle( asset_id: str, payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.transition_lifecycle, asset_id, payload, context) @app.post(f"{prefix}/relationships", tags=["relationships"]) def create_relationship( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.create_relationship, payload, context) @app.get(f"{prefix}/relationships", tags=["relationships"]) def list_relationships( source_id: str | None = Query(None), target_id: str | None = Query(None), ) -> dict[str, Any]: return response(runtime.list_relationships, source_id=source_id, target_id=target_id) @app.get(f"{prefix}/audit/events", tags=["audit"]) def list_audit_events( target: str | None = Query(None), correlation_id: str | None = Query(None), ) -> dict[str, Any]: return response(runtime.list_audit_events, target=target, correlation_id=correlation_id) @app.post(f"{prefix}/policy/evaluate", tags=["policy"]) def evaluate_policy( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.evaluate_policy, payload, context) @app.get(f"{prefix}/ingestion/capabilities", tags=["ingestion"]) def ingestion_capabilities() -> dict[str, Any]: return response(runtime.ingestion_capabilities) @app.post(f"{prefix}/ingestion/jobs", tags=["ingestion"]) def start_ingestion_job( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.start_ingestion_job, payload, context) @app.get(f"{prefix}/ingestion/jobs", tags=["ingestion"]) def list_ingestion_jobs(status: str | None = Query(None)) -> dict[str, Any]: return response(runtime.list_ingestion_jobs, status=status) @app.get(f"{prefix}/ingestion/jobs/{{job_id}}", tags=["ingestion"]) def get_ingestion_job(job_id: str) -> dict[str, Any]: return response(runtime.get_ingestion_job, job_id) @app.post(f"{prefix}/retrieval/index/refresh", tags=["retrieval"]) def refresh_retrieval_index() -> dict[str, Any]: return response(runtime.refresh_retrieval_index) @app.post(f"{prefix}/retrieval/assets", tags=["retrieval"]) def query_assets( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.query_assets, payload, context) @app.post(f"{prefix}/retrieval/context-entities", tags=["retrieval"]) def query_context_entities( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.query_context_entities, payload, context) @app.post(f"{prefix}/retrieval/relationships", tags=["retrieval"]) def query_retrieval_relationships( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.query_relationships, payload, context) @app.post(f"{prefix}/retrieval/feedback", tags=["retrieval"]) def record_retrieval_feedback( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.record_retrieval_feedback, payload, context) @app.get(f"{prefix}/retrieval/feedback", tags=["retrieval"]) def list_retrieval_feedback( correlation_id: str | None = Query(None), label: str | None = Query(None), ) -> dict[str, Any]: return response(runtime.list_retrieval_feedback, correlation_id=correlation_id, label=label) @app.get(f"{prefix}/retrieval/quality", tags=["retrieval"]) def retrieval_quality_metrics() -> dict[str, Any]: return response(runtime.retrieval_quality_metrics) @app.get(f"{prefix}/transformations/operations", tags=["transformations"]) def list_transformation_operations() -> dict[str, Any]: return response(runtime.list_transformation_operations) @app.post(f"{prefix}/transformations/runs", tags=["transformations"]) def execute_transformation( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.execute_transformation, payload, context) @app.get(f"{prefix}/transformations/runs", tags=["transformations"]) def list_transformation_runs( status: str | None = Query(None), operation_id: str | None = Query(None), ) -> dict[str, Any]: return response(runtime.list_transformation_runs, status=status, operation_id=operation_id) @app.get(f"{prefix}/transformations/runs/{{run_id}}", tags=["transformations"]) def get_transformation_run(run_id: str) -> dict[str, Any]: return response(runtime.get_transformation_run, run_id) @app.post(f"{prefix}/transformations/runs/{{run_id}}/retry", tags=["transformations"]) def retry_transformation_run( run_id: str, context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.retry_transformation_run, run_id, context) @app.post(f"{prefix}/transformations/runs/{{run_id}}/cancel", tags=["transformations"]) def cancel_transformation_run( run_id: str, payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.cancel_transformation_run, run_id, payload, context) @app.post(f"{prefix}/workflows/templates", tags=["workflows"]) def register_workflow_template( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.register_workflow_template, payload, context) @app.get(f"{prefix}/workflows/templates", tags=["workflows"]) def list_workflow_templates(template_id: str | None = Query(None)) -> dict[str, Any]: return response(runtime.list_workflow_templates, template_id=template_id) @app.get(f"{prefix}/workflows/templates/{{template_id}}", tags=["workflows"]) def get_workflow_template( template_id: str, version: str | None = Query(None), ) -> dict[str, Any]: return response(runtime.get_workflow_template, template_id, version=version) @app.post(f"{prefix}/workflows/runs", tags=["workflows"]) def invoke_workflow_run( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.invoke_workflow_run, payload, context) @app.post(f"{prefix}/workflows/runs/queue", tags=["workflows"]) def queue_workflow_run( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.queue_workflow_run, payload, context) @app.get(f"{prefix}/workflows/runs", tags=["workflows"]) def list_workflow_runs( status: str | None = Query(None), template_id: str | None = Query(None), ) -> dict[str, Any]: return response(runtime.list_workflow_runs, status=status, template_id=template_id) @app.get(f"{prefix}/workflows/runs/{{run_id}}", tags=["workflows"]) def get_workflow_run(run_id: str) -> dict[str, Any]: return response(runtime.get_workflow_run, run_id) @app.post(f"{prefix}/workflows/runs/{{run_id}}/resume", tags=["workflows"]) def resume_workflow_run( run_id: str, context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.resume_workflow_run, run_id, context) @app.post(f"{prefix}/workflows/runs/{{run_id}}/retry", tags=["workflows"]) def retry_workflow_run( run_id: str, context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.retry_workflow_run, run_id, context) @app.post(f"{prefix}/workflows/runs/{{run_id}}/cancel", tags=["workflows"]) def cancel_workflow_run( run_id: str, payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.cancel_workflow_run, run_id, payload, context) @app.get(f"{prefix}/workflows/runs/{{run_id}}/reconstruction", tags=["workflows"]) def reconstruct_workflow_run(run_id: str) -> dict[str, Any]: return response(runtime.reconstruct_workflow_run, run_id) @app.get(f"{prefix}/workflows/reviews", tags=["workflows"]) def list_workflow_review_tasks( status: str | None = Query(WorkflowReviewStatus.OPEN.value), workflow_run_id: str | None = Query(None), ) -> dict[str, Any]: return response(runtime.list_workflow_review_tasks, status=status, workflow_run_id=workflow_run_id) @app.get(f"{prefix}/workflows/exceptions", tags=["workflows"]) def list_workflow_exceptions( status: str | None = Query(WorkflowExceptionStatus.OPEN.value), kind: str | None = Query(None), workflow_run_id: str | None = Query(None), ) -> dict[str, Any]: return response( runtime.list_workflow_exceptions, status=status, kind=kind, workflow_run_id=workflow_run_id, ) @app.post(f"{prefix}/workflows/runs/{{run_id}}/reviews/{{review_id}}/decision", tags=["workflows"]) def record_workflow_review_decision( run_id: str, review_id: str, payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.record_workflow_review_decision, run_id, review_id, payload, context) @app.get(f"{prefix}/agents/operations", tags=["agents"]) def list_agent_operations() -> dict[str, Any]: return response(runtime.list_agent_operations) @app.get(f"{prefix}/agents/operations/{{operation_id}}", tags=["agents"]) def get_agent_operation(operation_id: str) -> dict[str, Any]: return response(runtime.get_agent_operation, operation_id) @app.post(f"{prefix}/agents/operations/{{operation_id}}", tags=["agents"]) def execute_agent_operation( operation_id: str, payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.execute_agent_operation, operation_id, payload, context) @app.get(f"{prefix}/context-packages/schema", tags=["context-packages"]) def context_package_schema() -> dict[str, Any]: return response(runtime.context_package_schema) @app.post(f"{prefix}/context-packages", tags=["context-packages"]) def assemble_context_package( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.assemble_context_package, payload, context) @app.get(f"{prefix}/operations/metrics", tags=["operations"]) def operational_metrics() -> dict[str, Any]: return response(runtime.operational_metrics) @app.get(f"{prefix}/operations/jobs", tags=["operations"]) def inspect_jobs( kind: str | None = Query(None), status: str | None = Query(None), correlation_id: str | None = Query(None), ) -> dict[str, Any]: return response(runtime.inspect_jobs, kind=kind, status=status, correlation_id=correlation_id) @app.get(f"{prefix}/operations/events", tags=["operations"]) def operational_events( correlation_id: str | None = Query(None), operation_prefix: str | None = Query(None), ) -> dict[str, Any]: return response(runtime.operational_events, correlation_id=correlation_id, operation_prefix=operation_prefix) @app.get(f"{prefix}/operations/recovery/actions", tags=["operations"]) def recovery_actions() -> dict[str, Any]: return response(runtime.recovery_actions) @app.post(f"{prefix}/operations/recovery/{{action}}", tags=["operations"]) def execute_recovery_action( action: str, payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.execute_recovery_action, action, payload, context) @app.post(f"{prefix}/exports", tags=["exports"]) def create_export_package( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.create_export_package, payload, context) @app.post(f"{prefix}/exports/validate", tags=["exports"]) def validate_export_package( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.validate_export_package, payload, context) @app.post(f"{prefix}/governance/report", tags=["governance"]) def governance_report( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.governance_report, payload, context) @app.get(f"{prefix}/extensions/catalog", tags=["extensions"]) def extension_catalog() -> dict[str, Any]: return response(runtime.extension_catalog) @app.post(f"{prefix}/extensions/events", tags=["extensions"]) def emit_extension_event( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.emit_extension_event, payload, context) @app.post(f"{prefix}/quality/signals", tags=["quality"]) def record_quality_signal( payload: dict[str, Any], context: OperationContext = Depends(context_from_headers), ) -> dict[str, Any]: return response(runtime.record_quality_signal, payload, context) @app.get(f"{prefix}/quality/cost", tags=["quality"]) def quality_cost_signals() -> dict[str, Any]: return response(runtime.quality_cost_signals) @app.get(f"{prefix}/performance/smoke", tags=["compliance"]) def performance_smoke_report() -> dict[str, Any]: return response(runtime.performance_smoke_report) @app.get(f"{prefix}/compliance/mvp", tags=["compliance"]) def mvp_compliance_report() -> dict[str, Any]: return response(runtime.mvp_compliance_report) return app def _cmis_profiles() -> tuple[CMISAccessProfile, ...]: return ( CMISAccessProfile.readonly_browser(), CMISAccessProfile.governed_authoring(), CMISAccessProfile.admin_export(), CMISAccessProfile.compat_tck(), ) def _cmis_access_point(profile: CMISAccessProfile) -> CMISAccessPoint: repository_id = profile.name if profile.name == "compat-tck" else f"kontextual-{profile.name}" return CMISAccessPoint( access_point_id=profile.name, repository_id=repository_id, profile=profile, base_path=f"/cmis/{profile.name}/browser", metadata={"repository_name": f"Kontextual Engine {profile.name}"}, ) def _cmis_asset_id(object_id: str | None) -> str: if not object_id: raise ValidationError("CMIS object id is required", details={"field": "object_id"}) normalized = object_id.strip("/") if normalized.startswith("cmis:asset:"): return normalized.removeprefix("cmis:asset:") if normalized.startswith("asset:"): return normalized.removeprefix("asset:") return normalized def _cmis_folder_path(folder_id: str | None) -> str | None: if not folder_id: return None normalized = folder_id.strip() if normalized in {"cmis-root", "root", "/"}: return "/" if normalized.startswith("cmis:folder:"): return "/" + normalized.removeprefix("cmis:folder:").replace("::", "/") return _normalize_cmis_path(normalized) def _normalize_cmis_path(path: str) -> str: parts = [part.strip().strip("/") for part in path.replace("\\", "/").split("/") if part.strip("/")] return "/" + "/".join(parts) _CMIS_QUERY_RE = re.compile( r"^\s*SELECT\s+(?P