generated from coulomb/repo-seed
Updated by fix-consistency on 2026-05-07: - update .custodian-brief.md for open-cmis-tck
349 lines
11 KiB
Python
349 lines
11 KiB
Python
"""CMIS Browser Binding preflight runner."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
|
|
def run(context: dict[str, Any]) -> dict[str, Any]:
|
|
target = context["target_profile"]
|
|
endpoint = _browser_endpoint(target)
|
|
if endpoint is None:
|
|
return {
|
|
"result": "fail",
|
|
"observations": [
|
|
"Target profile does not declare a CMIS Browser Binding endpoint."
|
|
],
|
|
"facts": {
|
|
"endpoint_found": False,
|
|
},
|
|
"artifact_refs": [],
|
|
}
|
|
|
|
timeout = _timeout_seconds(context)
|
|
artifact_refs: list[str] = []
|
|
request = Request(
|
|
endpoint["url"],
|
|
headers={
|
|
"Accept": "application/json, */*;q=0.1",
|
|
"User-Agent": "guide-board-open-cmis-tck-preflight/0.1.0",
|
|
},
|
|
)
|
|
try:
|
|
with urlopen(request, timeout=timeout) as response:
|
|
status_code = response.status
|
|
content_type = response.headers.get("Content-Type", "")
|
|
headers = dict(response.headers.items())
|
|
body = response.read(1024 * 1024)
|
|
artifact_refs = _write_response_artifacts(
|
|
context,
|
|
status_code,
|
|
content_type,
|
|
headers,
|
|
body,
|
|
)
|
|
except HTTPError as exc:
|
|
body = exc.read(1024 * 1024)
|
|
content_type = exc.headers.get("Content-Type", "")
|
|
artifact_refs = _write_response_artifacts(
|
|
context,
|
|
exc.code,
|
|
content_type,
|
|
dict(exc.headers.items()),
|
|
body,
|
|
)
|
|
return {
|
|
"result": "infrastructure_error",
|
|
"observations": [
|
|
f"CMIS Browser Binding endpoint returned HTTP {exc.code}."
|
|
],
|
|
"facts": {
|
|
"endpoint_found": True,
|
|
"url": endpoint["url"],
|
|
"http_status": exc.code,
|
|
"content_type": content_type,
|
|
},
|
|
"artifact_refs": artifact_refs,
|
|
}
|
|
except URLError as exc:
|
|
return {
|
|
"result": "infrastructure_error",
|
|
"observations": [
|
|
f"CMIS Browser Binding endpoint is not reachable: {exc.reason}."
|
|
],
|
|
"facts": {
|
|
"endpoint_found": True,
|
|
"url": endpoint["url"],
|
|
"error": str(exc.reason),
|
|
},
|
|
"artifact_refs": artifact_refs,
|
|
}
|
|
except TimeoutError:
|
|
return {
|
|
"result": "infrastructure_error",
|
|
"observations": [
|
|
f"CMIS Browser Binding endpoint did not respond within {timeout} seconds."
|
|
],
|
|
"facts": {
|
|
"endpoint_found": True,
|
|
"url": endpoint["url"],
|
|
"timeout_seconds": timeout,
|
|
},
|
|
"artifact_refs": artifact_refs,
|
|
}
|
|
|
|
facts: dict[str, Any] = {
|
|
"endpoint_found": True,
|
|
"url": endpoint["url"],
|
|
"binding": endpoint["binding"],
|
|
"http_status": status_code,
|
|
"content_type": content_type,
|
|
}
|
|
|
|
parsed = _parse_json(body)
|
|
if parsed is None:
|
|
facts["json_detected"] = False
|
|
return {
|
|
"result": "warning",
|
|
"observations": [
|
|
"CMIS Browser Binding endpoint is reachable but did not return parseable JSON."
|
|
],
|
|
"facts": facts,
|
|
"artifact_refs": artifact_refs,
|
|
}
|
|
|
|
facts["json_detected"] = True
|
|
repository_facts = _repository_facts(parsed, context)
|
|
facts.update(repository_facts)
|
|
unsupported = [
|
|
item
|
|
for item in repository_facts.get("capability_posture", [])
|
|
if item.get("status") == "unsupported"
|
|
]
|
|
expected_gaps = [
|
|
item
|
|
for item in repository_facts.get("capability_posture", [])
|
|
if item.get("status") == "expected_gap"
|
|
]
|
|
if unsupported:
|
|
return {
|
|
"result": "fail",
|
|
"observations": [
|
|
"CMIS Browser Binding endpoint is reachable, but declared capabilities are not supported by repository capability flags.",
|
|
"Unsupported declared requirements: "
|
|
+ ", ".join(item["requirement_ref"] for item in unsupported)
|
|
+ ".",
|
|
],
|
|
"facts": facts,
|
|
"artifact_refs": artifact_refs,
|
|
}
|
|
|
|
observations = [
|
|
"CMIS Browser Binding endpoint is reachable and returned parseable JSON."
|
|
]
|
|
if expected_gaps:
|
|
observations.append(
|
|
"Unsupported optional capabilities were accepted as known gaps: "
|
|
+ ", ".join(item["requirement_ref"] for item in expected_gaps)
|
|
+ "."
|
|
)
|
|
return {
|
|
"result": "pass",
|
|
"observations": observations,
|
|
"facts": facts,
|
|
"artifact_refs": artifact_refs,
|
|
}
|
|
|
|
|
|
def _browser_endpoint(target: dict[str, Any]) -> dict[str, Any] | None:
|
|
for endpoint in target.get("endpoints", []):
|
|
if endpoint.get("binding") == "cmis-browser":
|
|
return endpoint
|
|
return None
|
|
|
|
|
|
def _timeout_seconds(context: dict[str, Any]) -> float:
|
|
runtime_policy = context["assessment_profile"].get("runtime_policy", {})
|
|
configured = runtime_policy.get("timeout_seconds", 5)
|
|
if not isinstance(configured, (int, float)):
|
|
return 5.0
|
|
return max(1.0, min(float(configured), 10.0))
|
|
|
|
|
|
def _parse_json(body: bytes) -> Any:
|
|
try:
|
|
return json.loads(body.decode("utf-8"))
|
|
except (UnicodeDecodeError, json.JSONDecodeError):
|
|
return None
|
|
|
|
|
|
def _repository_facts(value: Any, context: dict[str, Any]) -> dict[str, Any]:
|
|
if not isinstance(value, dict):
|
|
return {"repository_shape": "unknown"}
|
|
|
|
if "repositoryId" in value:
|
|
repository_id = str(value["repositoryId"])
|
|
return {
|
|
"repository_shape": "single-repository-info",
|
|
"repository_ids": [repository_id],
|
|
"selected_repository_id": repository_id,
|
|
"cmis_version_supported": value.get("cmisVersionSupported"),
|
|
"capabilities_present": isinstance(value.get("capabilities"), dict),
|
|
"capability_flags": _capability_flags(value),
|
|
"capability_posture": _capability_posture(value, context),
|
|
}
|
|
|
|
repositories: dict[str, dict[str, Any]] = {}
|
|
for key, child in value.items():
|
|
if isinstance(child, dict) and (
|
|
"repositoryId" in child or "repositoryName" in child
|
|
):
|
|
repositories[str(child.get("repositoryId", key))] = child
|
|
|
|
if repositories:
|
|
selected_repository_id = _selected_repository_id(repositories, context)
|
|
selected_repository = repositories[selected_repository_id]
|
|
return {
|
|
"repository_shape": "repository-map",
|
|
"repository_ids": sorted(repositories),
|
|
"selected_repository_id": selected_repository_id,
|
|
"cmis_version_supported": selected_repository.get("cmisVersionSupported"),
|
|
"capabilities_present": isinstance(selected_repository.get("capabilities"), dict),
|
|
"capability_flags": _capability_flags(selected_repository),
|
|
"capability_posture": _capability_posture(selected_repository, context),
|
|
}
|
|
|
|
return {
|
|
"repository_shape": "object",
|
|
"top_level_keys": sorted(str(key) for key in value.keys())[:20],
|
|
}
|
|
|
|
|
|
def _selected_repository_id(
|
|
repositories: dict[str, dict[str, Any]],
|
|
context: dict[str, Any],
|
|
) -> str:
|
|
configured = _opencmis_policy(context).get("repository_id")
|
|
if isinstance(configured, str) and configured in repositories:
|
|
return configured
|
|
return sorted(repositories)[0]
|
|
|
|
|
|
def _capability_flags(repository_info: dict[str, Any]) -> dict[str, Any]:
|
|
capabilities = repository_info.get("capabilities", {})
|
|
return dict(capabilities) if isinstance(capabilities, dict) else {}
|
|
|
|
|
|
def _capability_posture(
|
|
repository_info: dict[str, Any],
|
|
context: dict[str, Any],
|
|
) -> list[dict[str, Any]]:
|
|
target = context["target_profile"]
|
|
declared = set(target.get("declared_capabilities", []))
|
|
known_gap_refs = {
|
|
requirement_ref: gap["id"]
|
|
for gap in target.get("known_gaps", [])
|
|
for requirement_ref in gap.get("requirement_refs", [])
|
|
}
|
|
refs = sorted(declared | set(known_gap_refs))
|
|
flags = _capability_flags(repository_info)
|
|
posture = []
|
|
for requirement_ref in refs:
|
|
support = _requirement_support(requirement_ref, flags)
|
|
if support is True:
|
|
status = "supported"
|
|
elif support is False and requirement_ref in known_gap_refs:
|
|
status = "expected_gap"
|
|
elif support is False:
|
|
status = "unsupported"
|
|
else:
|
|
status = "unknown"
|
|
posture.append(
|
|
{
|
|
"requirement_ref": requirement_ref,
|
|
"status": status,
|
|
"known_gap_ref": known_gap_refs.get(requirement_ref),
|
|
"flag_refs": _flag_refs(requirement_ref),
|
|
}
|
|
)
|
|
return posture
|
|
|
|
|
|
def _requirement_support(requirement_ref: str, flags: dict[str, Any]) -> bool | None:
|
|
if requirement_ref == "cmis.repository-info":
|
|
return True
|
|
flag_refs = _flag_refs(requirement_ref)
|
|
if not flag_refs:
|
|
return None
|
|
values = [flags[key] for key in flag_refs if key in flags]
|
|
if not values:
|
|
return None
|
|
return any(_flag_supports(value) for value in values)
|
|
|
|
|
|
def _flag_refs(requirement_ref: str) -> list[str]:
|
|
return {
|
|
"cmis.query": ["capabilityQuery"],
|
|
"cmis.acl": ["capabilityACL"],
|
|
"cmis.navigation-services": [
|
|
"capabilityGetDescendants",
|
|
"capabilityGetFolderTree",
|
|
],
|
|
"cmis.relationships": ["capabilityJoin"],
|
|
"cmis.change-log": ["capabilityChanges"],
|
|
"cmis.versioning": [
|
|
"capabilityVersionSpecificFiling",
|
|
"capabilityPWCSearchable",
|
|
"capabilityPWCUpdatable",
|
|
],
|
|
}.get(requirement_ref, [])
|
|
|
|
|
|
def _flag_supports(value: Any) -> bool:
|
|
if isinstance(value, bool):
|
|
return value
|
|
if isinstance(value, str):
|
|
return value.lower() not in {"", "false", "none", "no"}
|
|
return value is not None
|
|
|
|
|
|
def _opencmis_policy(context: dict[str, Any]) -> dict[str, Any]:
|
|
policy = context["assessment_profile"].get("runtime_policy", {}).get("opencmis_tck", {})
|
|
return policy if isinstance(policy, dict) else {}
|
|
|
|
|
|
def _write_response_artifacts(
|
|
context: dict[str, Any],
|
|
status_code: int,
|
|
content_type: str,
|
|
headers: dict[str, str],
|
|
body: bytes,
|
|
) -> list[str]:
|
|
run_dir = Path(context["run_dir"])
|
|
artifact_dir = run_dir / "artifacts" / "open-cmis-tck" / "preflight"
|
|
artifact_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
response_ref = "artifacts/open-cmis-tck/preflight/response-body.bin"
|
|
metadata_ref = "artifacts/open-cmis-tck/preflight/response-metadata.json"
|
|
|
|
(run_dir / response_ref).write_bytes(body)
|
|
(run_dir / metadata_ref).write_text(
|
|
json.dumps(
|
|
{
|
|
"status_code": status_code,
|
|
"content_type": content_type,
|
|
"headers": headers,
|
|
"byte_count": len(body),
|
|
},
|
|
indent=2,
|
|
sort_keys=True,
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
return [metadata_ref, response_ref]
|