Files
open-cmis-tck/scripts/opencmis_inmemory_server.py
2026-05-08 07:51:42 +02:00

532 lines
18 KiB
Python

#!/usr/bin/env python3
"""Prepare and run a local Apache Chemistry OpenCMIS in-memory server."""
from __future__ import annotations
import argparse
import json
import os
import shutil
import signal
import subprocess
import tarfile
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
SERVER_WAR_COORDINATE = "org.apache.chemistry.opencmis:chemistry-opencmis-server-inmemory:1.1.0:war"
TOMCAT_COORDINATE = "org.apache.tomcat:tomcat:9.0.117:tar.gz"
JAVAEE_COMPAT_COORDINATES = [
"javax.xml.ws:jaxws-api:2.3.1:jar",
"javax.jws:javax.jws-api:1.1:jar",
"javax.xml.soap:javax.xml.soap-api:1.4.0:jar",
"javax.annotation:javax.annotation-api:1.3.2:jar",
]
DEFAULT_CONTEXT_PATH = "/inmemory"
DEFAULT_PORT = 18080
def main() -> int:
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="command", required=True)
prepare = subparsers.add_parser("prepare")
prepare.add_argument("--runtime-dir", type=Path, default=_default_runtime_dir())
start = subparsers.add_parser("start")
start.add_argument("--runtime-dir", type=Path, default=_default_runtime_dir())
start.add_argument("--port", type=int, default=DEFAULT_PORT)
start.add_argument("--context-path", default=DEFAULT_CONTEXT_PATH)
start.add_argument("--timeout-seconds", type=int, default=90)
start.add_argument("--no-prepare", action="store_true")
stop = subparsers.add_parser("stop")
stop.add_argument("--runtime-dir", type=Path, default=_default_runtime_dir())
probe = subparsers.add_parser("probe")
probe.add_argument("--runtime-dir", type=Path, default=_default_runtime_dir())
probe.add_argument("--port", type=int, default=DEFAULT_PORT)
probe.add_argument("--context-path", default=DEFAULT_CONTEXT_PATH)
probe.add_argument("--timeout-seconds", type=int, default=5)
status = subparsers.add_parser("status")
status.add_argument("--runtime-dir", type=Path, default=_default_runtime_dir())
args = parser.parse_args()
if args.command == "prepare":
result = prepare_runtime(args.runtime_dir)
elif args.command == "start":
result = start_server(
runtime_dir=args.runtime_dir,
port=args.port,
context_path=args.context_path,
timeout_seconds=args.timeout_seconds,
prepare=not args.no_prepare,
)
elif args.command == "stop":
result = stop_server(args.runtime_dir)
elif args.command == "probe":
result = probe_server(args.runtime_dir, args.port, args.context_path, args.timeout_seconds)
elif args.command == "status":
result = status_server(args.runtime_dir)
else:
raise AssertionError(args.command)
print(json.dumps(result, indent=2, sort_keys=True))
return 0 if result["status"] in {"ready", "running", "stopped"} else 2
def prepare_runtime(runtime_dir: Path) -> dict[str, Any]:
runtime_dir = runtime_dir.resolve()
runtime_dir.mkdir(parents=True, exist_ok=True)
maven = _maven_executable()
downloads = []
for coordinate in [SERVER_WAR_COORDINATE, TOMCAT_COORDINATE, *JAVAEE_COMPAT_COORDINATES]:
completed = subprocess.run(
[
str(maven),
"-q",
"-f",
str(_extension_root() / "runtime" / "opencmis-tck" / "pom.xml"),
"org.apache.maven.plugins:maven-dependency-plugin:3.7.1:copy",
f"-Dartifact={coordinate}",
f"-DoutputDirectory={runtime_dir}",
],
capture_output=True,
text=True,
timeout=300,
check=False,
env=_tool_env(),
)
downloads.append(
{
"coordinate": coordinate,
"returncode": completed.returncode,
"stdout": completed.stdout[-2000:],
"stderr": completed.stderr[-4000:],
}
)
if completed.returncode != 0:
return _summary(
runtime_dir,
"blocked",
diagnostics=[
{
"severity": "error",
"field": "runtime.maven",
"message": f"Maven could not resolve {coordinate}.",
}
],
downloads=downloads,
)
tomcat_home = _prepare_tomcat(runtime_dir)
_install_compat_libs(runtime_dir, tomcat_home)
return _summary(runtime_dir, "ready", downloads=downloads)
def start_server(
runtime_dir: Path,
port: int,
context_path: str,
timeout_seconds: int,
prepare: bool = True,
) -> dict[str, Any]:
runtime_dir = runtime_dir.resolve()
if prepare:
prepared = prepare_runtime(runtime_dir)
if prepared["status"] != "ready":
return prepared
war = _server_war(runtime_dir)
tomcat_home = _tomcat_home(runtime_dir)
if war is None or tomcat_home is None:
return _summary(
runtime_dir,
"blocked",
diagnostics=[
{
"severity": "error",
"field": "runtime.artifacts",
"message": "OpenCMIS in-memory WAR or Tomcat runtime is missing; run prepare first.",
}
],
)
existing = status_server(runtime_dir)
if existing["status"] == "running":
return existing
logs_dir = runtime_dir / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
log_path = logs_dir / "opencmis-inmemory.log"
state_path = runtime_dir / "server-state.json"
_deploy_war(tomcat_home, war, context_path, port)
command = [
str(tomcat_home / "bin" / "catalina.sh"),
"run",
]
log_handle = log_path.open("ab")
process = subprocess.Popen(
command,
cwd=tomcat_home,
stdout=log_handle,
stderr=subprocess.STDOUT,
start_new_session=True,
env=_tool_env(),
)
state = {
"pid": process.pid,
"command": command,
"port": port,
"context_path": _normalized_context_path(context_path),
"container": "tomcat",
"tomcat_home": str(tomcat_home),
"browser_url": _browser_url(port, context_path),
"log_path": str(log_path),
"started_at": _now(),
}
state_path.write_text(json.dumps(state, indent=2, sort_keys=True) + "\n", encoding="utf-8")
deadline = time.time() + timeout_seconds
probe_result: dict[str, Any] = {"status": "blocked"}
while time.time() < deadline:
if process.poll() is not None:
return _summary(
runtime_dir,
"blocked",
server=state,
diagnostics=[
{
"severity": "error",
"field": "server.process",
"message": f"OpenCMIS in-memory server exited with {process.returncode}.",
}
],
log_tail=_read_tail(log_path),
)
probe_result = probe_server(runtime_dir, port, context_path, timeout_seconds=5)
if probe_result["status"] == "ready":
return _summary(runtime_dir, "running", server=state, probe=probe_result)
time.sleep(1)
stop_server(runtime_dir)
return _summary(
runtime_dir,
"blocked",
server=state,
probe=probe_result,
diagnostics=[
{
"severity": "error",
"field": "server.probe",
"message": f"OpenCMIS in-memory server did not become ready within {timeout_seconds} seconds.",
}
],
log_tail=_read_tail(log_path),
)
def stop_server(runtime_dir: Path) -> dict[str, Any]:
runtime_dir = runtime_dir.resolve()
state = _load_state(runtime_dir)
if not state:
return _summary(runtime_dir, "stopped")
pid = state.get("pid")
if isinstance(pid, int) and _is_process_running(pid):
_terminate_process_group(pid, signal.SIGTERM)
deadline = time.time() + 20
while time.time() < deadline and _is_process_running(pid):
time.sleep(0.5)
if _is_process_running(pid):
_terminate_process_group(pid, signal.SIGKILL)
(runtime_dir / "server-state.json").unlink(missing_ok=True)
return _summary(runtime_dir, "stopped", server=state)
def probe_server(
runtime_dir: Path,
port: int,
context_path: str,
timeout_seconds: int = 5,
) -> dict[str, Any]:
runtime_dir = runtime_dir.resolve()
output_path = runtime_dir / "browser-probe.json"
attempts = []
for url in _probe_urls(port, context_path):
try:
request = urllib.request.Request(
url,
headers={"Accept": "application/json, */*;q=0.1"},
)
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
body = response.read(1024 * 1024)
parsed = _parse_json(body)
attempts.append(
{
"url": url,
"http_status": response.status,
"content_type": response.headers.get("Content-Type"),
"body_preview": _body_preview(body),
}
)
if parsed is not None:
output_path.write_bytes(body)
return _summary(
runtime_dir,
"ready",
probe={
"url": url,
"http_status": response.status,
"repository_ids": _repository_ids(parsed),
"response_path": str(output_path),
"attempts": attempts,
},
)
except urllib.error.HTTPError as exc:
body = exc.read(64 * 1024)
attempts.append(
{
"url": url,
"http_status": exc.code,
"content_type": exc.headers.get("Content-Type"),
"body_preview": _body_preview(body),
}
)
except (urllib.error.URLError, TimeoutError, UnicodeDecodeError) as exc:
attempts.append({"url": url, "error": str(exc)})
return _summary(runtime_dir, "blocked", probe={"attempts": attempts})
def status_server(runtime_dir: Path) -> dict[str, Any]:
runtime_dir = runtime_dir.resolve()
state = _load_state(runtime_dir)
if not state:
return _summary(runtime_dir, "stopped")
pid = state.get("pid")
running = isinstance(pid, int) and _is_process_running(pid)
return _summary(runtime_dir, "running" if running else "stopped", server=state)
def _summary(runtime_dir: Path, status: str, **extra: Any) -> dict[str, Any]:
summary = {
"id": "opencmis-inmemory-server",
"status": status,
"created_at": _now(),
"runtime_dir": str(runtime_dir.resolve()),
"server_war_coordinate": SERVER_WAR_COORDINATE,
"tomcat_coordinate": TOMCAT_COORDINATE,
"javaee_compat_coordinates": JAVAEE_COMPAT_COORDINATES,
"default_repository_id": "A1",
"default_browser_url": _browser_url(DEFAULT_PORT, DEFAULT_CONTEXT_PATH),
}
summary.update({key: value for key, value in extra.items() if value is not None})
(runtime_dir / "inmemory-summary.json").write_text(
json.dumps(summary, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return summary
def _repository_ids(value: Any) -> list[str]:
if isinstance(value, dict) and "repositoryId" in value:
return [str(value["repositoryId"])]
if isinstance(value, dict):
ids = []
for key, item in value.items():
if isinstance(item, dict):
ids.append(str(item.get("repositoryId", key)))
return sorted(ids)
return []
def _parse_json(body: bytes) -> Any:
try:
return json.loads(body.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
return None
def _body_preview(body: bytes, limit: int = 500) -> str:
return body[:limit].decode("utf-8", errors="replace")
def _probe_urls(port: int, context_path: str) -> list[str]:
base = f"http://127.0.0.1:{port}{_normalized_context_path(context_path)}"
return [
f"{base}/browser",
f"{base}/browser/",
f"{base}/cmis-endpoints.json",
f"{base}/",
]
def _server_war(runtime_dir: Path) -> Path | None:
matches = sorted(runtime_dir.glob("chemistry-opencmis-server-inmemory-*.war"))
return matches[-1] if matches else None
def _tomcat_archive(runtime_dir: Path) -> Path | None:
matches = sorted(runtime_dir.glob("tomcat-*.tar.gz"))
return matches[-1] if matches else None
def _tomcat_home(runtime_dir: Path) -> Path | None:
candidates = sorted((runtime_dir / "containers").glob("apache-tomcat-*"))
return candidates[-1] if candidates else None
def _prepare_tomcat(runtime_dir: Path) -> Path:
existing = _tomcat_home(runtime_dir)
if existing is not None:
return existing
archive = _tomcat_archive(runtime_dir)
if archive is None:
raise ValueError("Tomcat archive is missing")
containers_dir = runtime_dir / "containers"
containers_dir.mkdir(parents=True, exist_ok=True)
with tarfile.open(archive, "r:gz") as handle:
_safe_extract(handle, containers_dir)
tomcat_home = _tomcat_home(runtime_dir)
if tomcat_home is None:
raise ValueError("Tomcat archive did not extract an apache-tomcat-* directory")
return tomcat_home
def _install_compat_libs(runtime_dir: Path, tomcat_home: Path) -> None:
lib_dir = tomcat_home / "lib"
for artifact_id in ["jaxws-api", "javax.jws-api", "javax.xml.soap-api", "javax.annotation-api"]:
matches = sorted(runtime_dir.glob(f"{artifact_id}-*.jar"))
if matches:
shutil.copy2(matches[-1], lib_dir / matches[-1].name)
def _deploy_war(tomcat_home: Path, war: Path, context_path: str, port: int) -> None:
context_name = _normalized_context_path(context_path).strip("/") or "ROOT"
webapps_dir = tomcat_home / "webapps"
_remove_generated_webapp(webapps_dir / context_name)
_remove_generated_webapp(webapps_dir / f"{context_name}.war")
shutil.copy2(war, webapps_dir / f"{context_name}.war")
server_xml = tomcat_home / "conf" / "server.xml"
text = server_xml.read_text(encoding="utf-8")
text = reconfigure_connector_port(text, port)
server_xml.write_text(text, encoding="utf-8")
(tomcat_home / "bin" / "catalina.sh").chmod(0o755)
def reconfigure_connector_port(server_xml: str, port: int) -> str:
return server_xml.replace('port="8080"', f'port="{port}"', 1)
def _remove_generated_webapp(path: Path) -> None:
if not path.exists():
return
webapps_dir = path.parent.resolve()
resolved = path.resolve()
try:
resolved.relative_to(webapps_dir)
except ValueError as exc:
raise ValueError(f"Refusing to remove path outside Tomcat webapps: {path}") from exc
if resolved.is_dir():
shutil.rmtree(resolved)
else:
resolved.unlink()
def _safe_extract(handle: tarfile.TarFile, destination: Path) -> None:
destination = destination.resolve()
for member in handle.getmembers():
target = (destination / member.name).resolve()
try:
target.relative_to(destination)
except ValueError as exc:
raise ValueError(f"Archive member escapes destination: {member.name}") from exc
handle.extractall(destination)
def _load_state(runtime_dir: Path) -> dict[str, Any] | None:
state_path = runtime_dir / "server-state.json"
if not state_path.exists():
return None
value = json.loads(state_path.read_text(encoding="utf-8"))
return value if isinstance(value, dict) else None
def _is_process_running(pid: int) -> bool:
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
def _terminate_process_group(pid: int, sig: signal.Signals) -> None:
try:
os.killpg(pid, sig)
except ProcessLookupError:
return
except PermissionError:
os.kill(pid, sig)
def _read_tail(path: Path, limit: int = 8000) -> str:
if not path.exists():
return ""
data = path.read_bytes()[-limit:]
return data.decode("utf-8", errors="replace")
def _java_executable() -> Path:
local = _extension_root() / ".local" / "toolchains" / "current-jdk" / "bin" / "java"
return local if local.exists() else Path("java")
def _maven_executable() -> Path:
local = _extension_root() / ".local" / "toolchains" / "current-maven" / "bin" / "mvn"
return local if local.exists() else Path("mvn")
def _tool_env() -> dict[str, str]:
env = os.environ.copy()
local_jdk = _extension_root() / ".local" / "toolchains" / "current-jdk"
local_maven = _extension_root() / ".local" / "toolchains" / "current-maven"
path_parts = []
if local_jdk.exists():
env["JAVA_HOME"] = str(local_jdk)
path_parts.append(str(local_jdk / "bin"))
if local_maven.exists():
env["MAVEN_HOME"] = str(local_maven)
path_parts.append(str(local_maven / "bin"))
path_parts.append(env.get("PATH", ""))
env["PATH"] = os.pathsep.join(part for part in path_parts if part)
return env
def _browser_url(port: int, context_path: str) -> str:
return f"http://127.0.0.1:{port}{_normalized_context_path(context_path)}/browser"
def _normalized_context_path(context_path: str) -> str:
value = "/" + context_path.strip("/")
return "" if value == "/" else value
def _default_runtime_dir() -> Path:
return _extension_root() / ".local" / "opencmis-inmemory"
def _extension_root() -> Path:
return Path(__file__).resolve().parents[1]
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
if __name__ == "__main__":
raise SystemExit(main())