Files
open-cmis-tck/src/open_cmis_tck/profile.py
2026-05-08 00:02:20 +02:00

169 lines
5.6 KiB
Python

"""CMIS-specific profile diagnostics for guide-board target profiles."""
from __future__ import annotations
from typing import Any
from urllib.parse import urlparse
KNOWN_CMIS_REQUIREMENTS = {
"cmis.repository-info",
"cmis.type-definitions",
"cmis.object-services",
"cmis.content-streams",
"cmis.navigation-services",
"cmis.query",
"cmis.relationships",
"cmis.acl",
"cmis.policies",
"cmis.versioning",
"cmis.change-log",
"cmis.extensions",
}
def validate_cmis_profile_config(
target_profile: dict[str, Any],
assessment_profile: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Return actionable CMIS diagnostics for guide-board profiles.
The generic guide-board target profile remains the persisted contract. This
helper explains how the OpenCMIS extension interprets those generic fields.
"""
diagnostics: list[dict[str, str]] = []
browser_endpoints = [
endpoint
for endpoint in target_profile.get("endpoints", [])
if endpoint.get("binding") == "cmis-browser"
]
if target_profile.get("subject_type") != "cmis-browser-binding-endpoint":
diagnostics.append(
_diagnostic(
"warning",
"subject_type",
"CMIS targets should use subject_type 'cmis-browser-binding-endpoint'.",
)
)
if not browser_endpoints:
diagnostics.append(
_diagnostic(
"error",
"endpoints",
"Add one endpoint with binding 'cmis-browser' and the Browser Binding service document URL.",
)
)
for index, endpoint in enumerate(browser_endpoints):
parsed = urlparse(str(endpoint.get("url", "")))
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
diagnostics.append(
_diagnostic(
"error",
f"endpoints[{index}].url",
"Use an absolute http(s) CMIS Browser Binding URL.",
)
)
declared = set(target_profile.get("declared_capabilities", []))
unknown_declared = sorted(declared - KNOWN_CMIS_REQUIREMENTS)
for requirement_ref in unknown_declared:
diagnostics.append(
_diagnostic(
"warning",
"declared_capabilities",
f"Declared CMIS capability {requirement_ref!r} is not in the extension's known requirement list.",
)
)
known_gap_refs = {
requirement_ref
for gap in target_profile.get("known_gaps", [])
for requirement_ref in gap.get("requirement_refs", [])
}
unexpected_gap_refs = sorted(known_gap_refs - KNOWN_CMIS_REQUIREMENTS)
for requirement_ref in unexpected_gap_refs:
diagnostics.append(
_diagnostic(
"warning",
"known_gaps",
f"Known gap {requirement_ref!r} is not in the extension's known requirement list.",
)
)
runtime_policy = (assessment_profile or {}).get("runtime_policy", {})
opencmis_policy = runtime_policy.get("opencmis_tck", {})
if opencmis_policy and not isinstance(opencmis_policy, dict):
diagnostics.append(
_diagnostic(
"error",
"runtime_policy.opencmis_tck",
"OpenCMIS runtime policy must be an object when present.",
)
)
opencmis_policy = {}
timeout = runtime_policy.get("timeout_seconds")
if assessment_profile is not None and not isinstance(timeout, (int, float)):
diagnostics.append(
_diagnostic(
"warning",
"runtime_policy.timeout_seconds",
"Set timeout_seconds to bound preflight and TCK execution.",
)
)
repository_id = None
if isinstance(opencmis_policy, dict):
repository_id = opencmis_policy.get("repository_id")
if repository_id is not None and not isinstance(repository_id, str):
diagnostics.append(
_diagnostic(
"error",
"runtime_policy.opencmis_tck.repository_id",
"repository_id must be a string when configured.",
)
)
credentials_ref = target_profile.get("credentials_ref")
auth_mode = "anonymous"
if isinstance(credentials_ref, str) and credentials_ref:
auth_mode = _auth_mode(credentials_ref)
if auth_mode == "unknown":
diagnostics.append(
_diagnostic(
"warning",
"credentials_ref",
"Use null, env:USER_VAR,PASSWORD_VAR, or file:/path/to/credentials.json.",
)
)
status = "invalid" if any(item["severity"] == "error" for item in diagnostics) else "valid"
return {
"status": status,
"diagnostics": diagnostics,
"cmis_config": {
"browser_binding_url": browser_endpoints[0]["url"] if browser_endpoints else None,
"repository_id": repository_id,
"auth_mode": auth_mode,
"credentials_ref": credentials_ref,
"declared_capabilities": sorted(declared),
"known_gap_refs": sorted(known_gap_refs),
"timeout_seconds": timeout,
},
}
def _diagnostic(severity: str, field: str, message: str) -> dict[str, str]:
return {"severity": severity, "field": field, "message": message}
def _auth_mode(credentials_ref: str) -> str:
if credentials_ref.startswith("env:"):
return "env"
if credentials_ref.startswith("file:"):
return "file"
return "unknown"