generated from coulomb/repo-seed
OpenCMIS in-memory server
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import http.client
|
||||
import json
|
||||
import os
|
||||
@@ -110,6 +111,25 @@ class OpenCmisTckExtensionTests(unittest.TestCase):
|
||||
profile = validate_target_profile(template_dir / name)
|
||||
self.assertEqual(profile["subject_type"], "cmis-browser-binding-endpoint")
|
||||
|
||||
def test_inmemory_pilot_profiles_validate(self) -> None:
|
||||
target = validate_target_profile(ROOT / "profiles" / "targets" / "opencmis-inmemory-local.json")
|
||||
assessment = validate_assessment_profile(
|
||||
ROOT / "profiles" / "assessments" / "cmis-browser-inmemory-pilot.json"
|
||||
)
|
||||
plan = build_run_plan(
|
||||
CORE_ROOT,
|
||||
ROOT / "profiles" / "targets" / "opencmis-inmemory-local.json",
|
||||
ROOT / "profiles" / "assessments" / "cmis-browser-inmemory-pilot.json",
|
||||
[ROOT],
|
||||
)
|
||||
|
||||
self.assertEqual(target["credentials_ref"], "env:OPENCMIS_INMEMORY_USER,OPENCMIS_INMEMORY_PASSWORD")
|
||||
self.assertEqual(assessment["target_profile_ref"], "opencmis-inmemory-local")
|
||||
self.assertEqual(
|
||||
[step["id"] for step in plan["ordered_steps"]],
|
||||
["preflight:open-cmis-tck", "check-group:open-cmis-tck:repository-type"],
|
||||
)
|
||||
|
||||
def test_bootstrap_reports_local_tck_runtime_posture(self) -> None:
|
||||
with TemporaryDirectory() as temporary_directory:
|
||||
output = Path(temporary_directory) / "runtime-summary.json"
|
||||
@@ -388,6 +408,61 @@ class OpenCmisTckExtensionTests(unittest.TestCase):
|
||||
thread.join(timeout=5)
|
||||
server.server_close()
|
||||
|
||||
def test_preflight_uses_env_credentials_for_basic_auth(self) -> None:
|
||||
server = HTTPServer(("127.0.0.1", 0), _BasicAuthCmisHandler)
|
||||
thread = threading.Thread(target=server.serve_forever)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
original_user = os.environ.get("CMIS_AUTH_USER")
|
||||
original_password = os.environ.get("CMIS_AUTH_PASSWORD")
|
||||
os.environ["CMIS_AUTH_USER"] = "alice"
|
||||
os.environ["CMIS_AUTH_PASSWORD"] = "secret"
|
||||
try:
|
||||
with TemporaryDirectory() as temporary_directory:
|
||||
temp_root = Path(temporary_directory)
|
||||
target_path = temp_root / "target.json"
|
||||
assessment_path = temp_root / "assessment.json"
|
||||
_write_target(target_path, server.server_port, "local-cmis-auth-preflight")
|
||||
target = json.loads(target_path.read_text(encoding="utf-8"))
|
||||
target["credentials_ref"] = "env:CMIS_AUTH_USER,CMIS_AUTH_PASSWORD"
|
||||
target_path.write_text(json.dumps(target), encoding="utf-8")
|
||||
_write_assessment(
|
||||
assessment_path,
|
||||
"local-cmis-auth-preflight",
|
||||
"local-cmis-auth-preflight",
|
||||
[],
|
||||
None,
|
||||
)
|
||||
|
||||
result = run_assessment(
|
||||
CORE_ROOT,
|
||||
target_path,
|
||||
assessment_path,
|
||||
temp_root / "run",
|
||||
[ROOT],
|
||||
)
|
||||
evidence = json.loads(
|
||||
(Path(result["run_dir"]) / "normalized" / "evidence.json").read_text(
|
||||
encoding="utf-8"
|
||||
)
|
||||
)["evidence"]
|
||||
|
||||
self.assertEqual(result["status"], "completed")
|
||||
self.assertEqual(evidence[0]["result"], "pass")
|
||||
self.assertEqual(evidence[0]["facts"]["auth_mode"], "env")
|
||||
finally:
|
||||
if original_user is None:
|
||||
os.environ.pop("CMIS_AUTH_USER", None)
|
||||
else:
|
||||
os.environ["CMIS_AUTH_USER"] = original_user
|
||||
if original_password is None:
|
||||
os.environ.pop("CMIS_AUTH_PASSWORD", None)
|
||||
else:
|
||||
os.environ["CMIS_AUTH_PASSWORD"] = original_password
|
||||
server.shutdown()
|
||||
thread.join(timeout=5)
|
||||
server.server_close()
|
||||
|
||||
def test_runs_cmis_tck_command_wrapper_boundary(self) -> None:
|
||||
server = HTTPServer(("127.0.0.1", 0), _CmisHandler)
|
||||
thread = threading.Thread(target=server.serve_forever)
|
||||
@@ -985,5 +1060,19 @@ class _CmisHandler(BaseHTTPRequestHandler):
|
||||
return
|
||||
|
||||
|
||||
class _BasicAuthCmisHandler(_CmisHandler):
|
||||
def do_GET(self) -> None:
|
||||
expected = "Basic " + base64.b64encode(b"alice:secret").decode("ascii")
|
||||
if self.headers.get("Authorization") != expected:
|
||||
body = b"unauthorized"
|
||||
self.send_response(401)
|
||||
self.send_header("Content-Type", "text/plain")
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
return
|
||||
super().do_GET()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user