Files
open-cmis-tck/tests/test_open_cmis_tck.py

343 lines
13 KiB
Python

from __future__ import annotations
import json
import threading
import unittest
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from tempfile import TemporaryDirectory
from guide_board.discovery import discover_extensions
from guide_board.execution import run_assessment
from guide_board.planning import build_run_plan, validate_assessment_profile
ROOT = Path(__file__).resolve().parents[1]
CORE_ROOT = ROOT.parent / "guide-board"
class OpenCmisTckExtensionTests(unittest.TestCase):
def test_extension_manifest_discovers_from_repo_root(self) -> None:
extensions = {
extension.id: extension
for extension in discover_extensions(CORE_ROOT, [ROOT])
}
self.assertIn("open-cmis-tck", extensions)
self.assertEqual(extensions["open-cmis-tck"].source, "external")
self.assertEqual(extensions["open-cmis-tck"].path, ROOT)
def test_builds_cmis_baseline_plan_from_external_extension(self) -> None:
assessment = validate_assessment_profile(
ROOT / "profiles" / "assessments" / "cmis-browser-baseline.json"
)
plan = build_run_plan(
CORE_ROOT,
ROOT / "profiles" / "targets" / "kontextual-cmis-compat.json",
ROOT / "profiles" / "assessments" / "cmis-browser-baseline.json",
[ROOT],
)
self.assertEqual(assessment["extension_refs"], ["open-cmis-tck"])
self.assertEqual(plan["extension_snapshots"][0]["id"], "open-cmis-tck")
self.assertEqual(plan["extension_snapshots"][0]["source"], "external")
self.assertEqual(plan["extension_snapshots"][0]["path"], str(ROOT))
self.assertEqual(len(plan["ordered_steps"]), 3)
def test_runs_cmis_preflight_against_local_endpoint(self) -> None:
server = HTTPServer(("127.0.0.1", 0), _CmisHandler)
thread = threading.Thread(target=server.serve_forever)
thread.daemon = True
thread.start()
try:
with TemporaryDirectory() as temporary_directory:
temp_root = Path(temporary_directory)
target_path = temp_root / "target.json"
assessment_path = temp_root / "assessment.json"
_write_target(target_path, server.server_port, "local-cmis-test")
_write_assessment(
assessment_path,
"local-cmis-preflight",
"local-cmis-test",
[],
None,
)
result = run_assessment(
CORE_ROOT,
target_path,
assessment_path,
temp_root / "run",
[ROOT],
)
run_dir = Path(result["run_dir"])
evidence = json.loads(
(run_dir / "normalized" / "evidence.json").read_text(
encoding="utf-8"
)
)["evidence"]
package = json.loads(
(run_dir / "reports" / "assessment-package.json").read_text(
encoding="utf-8"
)
)
self.assertEqual(result["status"], "completed")
self.assertEqual(evidence[0]["result"], "pass")
self.assertEqual(
evidence[0]["facts"]["repository_ids"],
["local-test-repository"],
)
self.assertEqual(len(package["artifact_manifest"]), 2)
self.assertTrue(
(
run_dir
/ "artifacts"
/ "open-cmis-tck"
/ "preflight"
/ "response-metadata.json"
).exists()
)
finally:
server.shutdown()
thread.join(timeout=5)
server.server_close()
def test_runs_cmis_tck_command_wrapper_boundary(self) -> None:
server = HTTPServer(("127.0.0.1", 0), _CmisHandler)
thread = threading.Thread(target=server.serve_forever)
thread.daemon = True
thread.start()
try:
with TemporaryDirectory() as temporary_directory:
temp_root = Path(temporary_directory)
target_path = temp_root / "target.json"
assessment_path = temp_root / "assessment.json"
waiver_path = temp_root / "waivers.json"
_write_target(target_path, server.server_port, "local-cmis-command-test")
_write_assessment(
assessment_path,
"local-cmis-command-boundary",
"local-cmis-command-test",
["repository-type"],
str(waiver_path),
)
_write_command_waiver(waiver_path, "local-cmis-command-test")
result = run_assessment(
CORE_ROOT,
target_path,
assessment_path,
temp_root / "run",
[ROOT],
)
run_dir = Path(result["run_dir"])
evidence = json.loads(
(run_dir / "normalized" / "evidence.json").read_text(
encoding="utf-8"
)
)["evidence"]
findings = json.loads(
(run_dir / "normalized" / "findings.json").read_text(
encoding="utf-8"
)
)["findings"]
mappings = json.loads(
(run_dir / "normalized" / "mappings.json").read_text(
encoding="utf-8"
)
)["mappings"]
self.assertEqual(result["status"], "blocked")
self.assertEqual(evidence[0]["result"], "pass")
self.assertEqual(evidence[1]["result"], "blocked")
self.assertEqual(evidence[1]["facts"]["runner_kind"], "command")
self.assertIn(
evidence[1]["facts"]["blocked_reason"],
{"missing_dependency", "tck_invocation_not_configured"},
)
self.assertEqual(findings[0]["waiver_ref"], "local-command-wrapper-bootstrap")
self.assertEqual({mapping["target_id"] for mapping in mappings}, {"repository-type"})
finally:
server.shutdown()
thread.join(timeout=5)
server.server_close()
def test_preflight_failure_blocks_downstream_checks(self) -> None:
with TemporaryDirectory() as temporary_directory:
temp_root = Path(temporary_directory)
target_path = temp_root / "target.json"
assessment_path = temp_root / "assessment.json"
_write_failing_target(target_path)
_write_assessment(
assessment_path,
"local-cmis-preflight-gate",
"local-cmis-preflight-failure",
["repository-type"],
None,
)
result = run_assessment(
CORE_ROOT,
target_path,
assessment_path,
temp_root / "run",
[ROOT],
)
run_dir = Path(result["run_dir"])
evidence = json.loads(
(run_dir / "normalized" / "evidence.json").read_text(encoding="utf-8")
)["evidence"]
findings = json.loads(
(run_dir / "normalized" / "findings.json").read_text(encoding="utf-8")
)["findings"]
self.assertEqual(result["status"], "infrastructure_error")
self.assertEqual(evidence[0]["result"], "infrastructure_error")
self.assertEqual(evidence[1]["result"], "blocked")
self.assertEqual(evidence[1]["facts"]["blocked_reason"], "preflight_failed")
self.assertFalse((run_dir / "artifacts" / "runner-contexts").exists())
self.assertEqual(findings[1]["classification"], "preflight_failed")
self.assertTrue(findings[1]["expected"])
def _write_target(path: Path, port: int, target_id: str) -> None:
path.write_text(
json.dumps(
{
"id": target_id,
"subject_type": "cmis-browser-binding-endpoint",
"subject_name": "Local CMIS Test",
"environment": "test",
"scope": ["preflight", "tck-wrapper"],
"endpoints": [
{
"id": "browser-binding",
"url": f"http://127.0.0.1:{port}/cmis/browser",
"binding": "cmis-browser",
}
],
"artifacts": [],
"credentials_ref": None,
"declared_capabilities": ["cmis.repository-info"],
"known_gaps": [],
}
),
encoding="utf-8",
)
def _write_failing_target(path: Path) -> None:
path.write_text(
json.dumps(
{
"id": "local-cmis-preflight-failure",
"subject_type": "cmis-browser-binding-endpoint",
"subject_name": "Local CMIS Preflight Failure",
"environment": "test",
"scope": ["preflight", "tck-wrapper"],
"endpoints": [
{
"id": "browser-binding",
"url": "http://127.0.0.1:9/cmis/browser",
"binding": "cmis-browser",
}
],
"artifacts": [],
"credentials_ref": None,
"declared_capabilities": ["cmis.repository-info"],
"known_gaps": [],
}
),
encoding="utf-8",
)
def _write_assessment(
path: Path,
assessment_id: str,
target_id: str,
check_groups: list[str],
waiver_ref: str | None,
) -> None:
path.write_text(
json.dumps(
{
"id": assessment_id,
"framework_refs": ["cmis.browser-binding.compatibility.v1"],
"extension_refs": ["open-cmis-tck"],
"target_profile_ref": target_id,
"selected_check_groups": {"open-cmis-tck": check_groups},
"expectations_ref": None,
"waivers_ref": waiver_ref,
"output_policy": {
"report_formats": ["json", "markdown"],
"artifact_retention": "summary-only",
},
"retention_policy": {
"summary_days": 365,
"raw_artifact_days": 0,
},
"runtime_policy": {
"offline": False,
"timeout_seconds": 15,
},
}
),
encoding="utf-8",
)
def _write_command_waiver(path: Path, target_id: str) -> None:
path.write_text(
json.dumps(
{
"id": "local-cmis-command-waivers",
"target_profile_ref": target_id,
"waivers": [
{
"id": "local-command-wrapper-bootstrap",
"scope": "test",
"requirement_refs": [],
"check_refs": ["check-group:open-cmis-tck:repository-type"],
"result_refs": ["blocked"],
"classification_refs": [],
"reason": "The test stops before invoking the Java/Maven TCK.",
"owner": "open-cmis-tck-tests",
"approved_by": "open-cmis-tck-tests",
"created_at": "2026-05-07",
"expires_at": "2099-12-31",
"review_status": "approved",
}
],
}
),
encoding="utf-8",
)
class _CmisHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
body = json.dumps(
{
"local-test-repository": {
"repositoryId": "local-test-repository",
"repositoryName": "Local Test Repository",
"cmisVersionSupported": "1.1",
"capabilities": {},
}
}
).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format: str, *args: object) -> None:
return
if __name__ == "__main__":
unittest.main()