#!/usr/bin/env python3 """Prepare and run a local Apache Chemistry OpenCMIS in-memory server.""" from __future__ import annotations import argparse import json import os import shutil import signal import subprocess import tarfile import time import urllib.error import urllib.request from datetime import datetime, timezone from pathlib import Path from typing import Any SERVER_WAR_COORDINATE = "org.apache.chemistry.opencmis:chemistry-opencmis-server-inmemory:1.1.0:war" TOMCAT_COORDINATE = "org.apache.tomcat:tomcat:9.0.117:tar.gz" JAVAEE_COMPAT_COORDINATES = [ "javax.xml.ws:jaxws-api:2.3.1:jar", "javax.jws:javax.jws-api:1.1:jar", "javax.xml.soap:javax.xml.soap-api:1.4.0:jar", "javax.annotation:javax.annotation-api:1.3.2:jar", ] DEFAULT_CONTEXT_PATH = "/inmemory" DEFAULT_PORT = 18080 def main() -> int: parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest="command", required=True) prepare = subparsers.add_parser("prepare") prepare.add_argument("--runtime-dir", type=Path, default=_default_runtime_dir()) start = subparsers.add_parser("start") start.add_argument("--runtime-dir", type=Path, default=_default_runtime_dir()) start.add_argument("--port", type=int, default=DEFAULT_PORT) start.add_argument("--context-path", default=DEFAULT_CONTEXT_PATH) start.add_argument("--timeout-seconds", type=int, default=90) start.add_argument("--no-prepare", action="store_true") stop = subparsers.add_parser("stop") stop.add_argument("--runtime-dir", type=Path, default=_default_runtime_dir()) probe = subparsers.add_parser("probe") probe.add_argument("--runtime-dir", type=Path, default=_default_runtime_dir()) probe.add_argument("--port", type=int, default=DEFAULT_PORT) probe.add_argument("--context-path", default=DEFAULT_CONTEXT_PATH) probe.add_argument("--timeout-seconds", type=int, default=5) status = subparsers.add_parser("status") status.add_argument("--runtime-dir", type=Path, default=_default_runtime_dir()) args = parser.parse_args() if args.command == "prepare": result = prepare_runtime(args.runtime_dir) elif args.command == "start": result = start_server( runtime_dir=args.runtime_dir, port=args.port, context_path=args.context_path, timeout_seconds=args.timeout_seconds, prepare=not args.no_prepare, ) elif args.command == "stop": result = stop_server(args.runtime_dir) elif args.command == "probe": result = probe_server(args.runtime_dir, args.port, args.context_path, args.timeout_seconds) elif args.command == "status": result = status_server(args.runtime_dir) else: raise AssertionError(args.command) print(json.dumps(result, indent=2, sort_keys=True)) return 0 if result["status"] in {"ready", "running", "stopped"} else 2 def prepare_runtime(runtime_dir: Path) -> dict[str, Any]: runtime_dir = runtime_dir.resolve() runtime_dir.mkdir(parents=True, exist_ok=True) maven = _maven_executable() downloads = [] for coordinate in [SERVER_WAR_COORDINATE, TOMCAT_COORDINATE, *JAVAEE_COMPAT_COORDINATES]: completed = subprocess.run( [ str(maven), "-q", "-f", str(_extension_root() / "runtime" / "opencmis-tck" / "pom.xml"), "org.apache.maven.plugins:maven-dependency-plugin:3.7.1:copy", f"-Dartifact={coordinate}", f"-DoutputDirectory={runtime_dir}", ], capture_output=True, text=True, timeout=300, check=False, env=_tool_env(), ) downloads.append( { "coordinate": coordinate, "returncode": completed.returncode, "stdout": completed.stdout[-2000:], "stderr": completed.stderr[-4000:], } ) if completed.returncode != 0: return _summary( runtime_dir, "blocked", diagnostics=[ { "severity": "error", "field": "runtime.maven", "message": f"Maven could not resolve {coordinate}.", } ], downloads=downloads, ) tomcat_home = _prepare_tomcat(runtime_dir) _install_compat_libs(runtime_dir, tomcat_home) return _summary(runtime_dir, "ready", downloads=downloads) def start_server( runtime_dir: Path, port: int, context_path: str, timeout_seconds: int, prepare: bool = True, ) -> dict[str, Any]: runtime_dir = runtime_dir.resolve() if prepare: prepared = prepare_runtime(runtime_dir) if prepared["status"] != "ready": return prepared war = _server_war(runtime_dir) tomcat_home = _tomcat_home(runtime_dir) if war is None or tomcat_home is None: return _summary( runtime_dir, "blocked", diagnostics=[ { "severity": "error", "field": "runtime.artifacts", "message": "OpenCMIS in-memory WAR or Tomcat runtime is missing; run prepare first.", } ], ) existing = status_server(runtime_dir) if existing["status"] == "running": return existing logs_dir = runtime_dir / "logs" logs_dir.mkdir(parents=True, exist_ok=True) log_path = logs_dir / "opencmis-inmemory.log" state_path = runtime_dir / "server-state.json" _deploy_war(tomcat_home, war, context_path, port) command = [ str(tomcat_home / "bin" / "catalina.sh"), "run", ] log_handle = log_path.open("ab") process = subprocess.Popen( command, cwd=tomcat_home, stdout=log_handle, stderr=subprocess.STDOUT, start_new_session=True, env=_tool_env(), ) state = { "pid": process.pid, "command": command, "port": port, "context_path": _normalized_context_path(context_path), "container": "tomcat", "tomcat_home": str(tomcat_home), "browser_url": _browser_url(port, context_path), "log_path": str(log_path), "started_at": _now(), } state_path.write_text(json.dumps(state, indent=2, sort_keys=True) + "\n", encoding="utf-8") deadline = time.time() + timeout_seconds probe_result: dict[str, Any] = {"status": "blocked"} while time.time() < deadline: if process.poll() is not None: return _summary( runtime_dir, "blocked", server=state, diagnostics=[ { "severity": "error", "field": "server.process", "message": f"OpenCMIS in-memory server exited with {process.returncode}.", } ], log_tail=_read_tail(log_path), ) probe_result = probe_server(runtime_dir, port, context_path, timeout_seconds=5) if probe_result["status"] == "ready": return _summary(runtime_dir, "running", server=state, probe=probe_result) time.sleep(1) stop_server(runtime_dir) return _summary( runtime_dir, "blocked", server=state, probe=probe_result, diagnostics=[ { "severity": "error", "field": "server.probe", "message": f"OpenCMIS in-memory server did not become ready within {timeout_seconds} seconds.", } ], log_tail=_read_tail(log_path), ) def stop_server(runtime_dir: Path) -> dict[str, Any]: runtime_dir = runtime_dir.resolve() state = _load_state(runtime_dir) if not state: return _summary(runtime_dir, "stopped") pid = state.get("pid") if isinstance(pid, int) and _is_process_running(pid): _terminate_process_group(pid, signal.SIGTERM) deadline = time.time() + 20 while time.time() < deadline and _is_process_running(pid): time.sleep(0.5) if _is_process_running(pid): _terminate_process_group(pid, signal.SIGKILL) (runtime_dir / "server-state.json").unlink(missing_ok=True) return _summary(runtime_dir, "stopped", server=state) def probe_server( runtime_dir: Path, port: int, context_path: str, timeout_seconds: int = 5, ) -> dict[str, Any]: runtime_dir = runtime_dir.resolve() output_path = runtime_dir / "browser-probe.json" attempts = [] for url in _probe_urls(port, context_path): try: request = urllib.request.Request( url, headers={"Accept": "application/json, */*;q=0.1"}, ) with urllib.request.urlopen(request, timeout=timeout_seconds) as response: body = response.read(1024 * 1024) parsed = _parse_json(body) attempts.append( { "url": url, "http_status": response.status, "content_type": response.headers.get("Content-Type"), "body_preview": _body_preview(body), } ) if parsed is not None: output_path.write_bytes(body) return _summary( runtime_dir, "ready", probe={ "url": url, "http_status": response.status, "repository_ids": _repository_ids(parsed), "response_path": str(output_path), "attempts": attempts, }, ) except urllib.error.HTTPError as exc: body = exc.read(64 * 1024) attempts.append( { "url": url, "http_status": exc.code, "content_type": exc.headers.get("Content-Type"), "body_preview": _body_preview(body), } ) except (urllib.error.URLError, TimeoutError, UnicodeDecodeError) as exc: attempts.append({"url": url, "error": str(exc)}) return _summary(runtime_dir, "blocked", probe={"attempts": attempts}) def status_server(runtime_dir: Path) -> dict[str, Any]: runtime_dir = runtime_dir.resolve() state = _load_state(runtime_dir) if not state: return _summary(runtime_dir, "stopped") pid = state.get("pid") running = isinstance(pid, int) and _is_process_running(pid) return _summary(runtime_dir, "running" if running else "stopped", server=state) def _summary(runtime_dir: Path, status: str, **extra: Any) -> dict[str, Any]: summary = { "id": "opencmis-inmemory-server", "status": status, "created_at": _now(), "runtime_dir": str(runtime_dir.resolve()), "server_war_coordinate": SERVER_WAR_COORDINATE, "tomcat_coordinate": TOMCAT_COORDINATE, "javaee_compat_coordinates": JAVAEE_COMPAT_COORDINATES, "default_repository_id": "A1", "default_browser_url": _browser_url(DEFAULT_PORT, DEFAULT_CONTEXT_PATH), } summary.update({key: value for key, value in extra.items() if value is not None}) (runtime_dir / "inmemory-summary.json").write_text( json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8", ) return summary def _repository_ids(value: Any) -> list[str]: if isinstance(value, dict) and "repositoryId" in value: return [str(value["repositoryId"])] if isinstance(value, dict): ids = [] for key, item in value.items(): if isinstance(item, dict): ids.append(str(item.get("repositoryId", key))) return sorted(ids) return [] def _parse_json(body: bytes) -> Any: try: return json.loads(body.decode("utf-8")) except (json.JSONDecodeError, UnicodeDecodeError): return None def _body_preview(body: bytes, limit: int = 500) -> str: return body[:limit].decode("utf-8", errors="replace") def _probe_urls(port: int, context_path: str) -> list[str]: base = f"http://127.0.0.1:{port}{_normalized_context_path(context_path)}" return [ f"{base}/browser", f"{base}/browser/", f"{base}/cmis-endpoints.json", f"{base}/", ] def _server_war(runtime_dir: Path) -> Path | None: matches = sorted(runtime_dir.glob("chemistry-opencmis-server-inmemory-*.war")) return matches[-1] if matches else None def _tomcat_archive(runtime_dir: Path) -> Path | None: matches = sorted(runtime_dir.glob("tomcat-*.tar.gz")) return matches[-1] if matches else None def _tomcat_home(runtime_dir: Path) -> Path | None: candidates = sorted((runtime_dir / "containers").glob("apache-tomcat-*")) return candidates[-1] if candidates else None def _prepare_tomcat(runtime_dir: Path) -> Path: existing = _tomcat_home(runtime_dir) if existing is not None: return existing archive = _tomcat_archive(runtime_dir) if archive is None: raise ValueError("Tomcat archive is missing") containers_dir = runtime_dir / "containers" containers_dir.mkdir(parents=True, exist_ok=True) with tarfile.open(archive, "r:gz") as handle: _safe_extract(handle, containers_dir) tomcat_home = _tomcat_home(runtime_dir) if tomcat_home is None: raise ValueError("Tomcat archive did not extract an apache-tomcat-* directory") return tomcat_home def _install_compat_libs(runtime_dir: Path, tomcat_home: Path) -> None: lib_dir = tomcat_home / "lib" for artifact_id in ["jaxws-api", "javax.jws-api", "javax.xml.soap-api", "javax.annotation-api"]: matches = sorted(runtime_dir.glob(f"{artifact_id}-*.jar")) if matches: shutil.copy2(matches[-1], lib_dir / matches[-1].name) def _deploy_war(tomcat_home: Path, war: Path, context_path: str, port: int) -> None: context_name = _normalized_context_path(context_path).strip("/") or "ROOT" webapps_dir = tomcat_home / "webapps" _remove_generated_webapp(webapps_dir / context_name) _remove_generated_webapp(webapps_dir / f"{context_name}.war") shutil.copy2(war, webapps_dir / f"{context_name}.war") server_xml = tomcat_home / "conf" / "server.xml" text = server_xml.read_text(encoding="utf-8") text = reconfigure_connector_port(text, port) server_xml.write_text(text, encoding="utf-8") (tomcat_home / "bin" / "catalina.sh").chmod(0o755) def reconfigure_connector_port(server_xml: str, port: int) -> str: return server_xml.replace('port="8080"', f'port="{port}"', 1) def _remove_generated_webapp(path: Path) -> None: if not path.exists(): return webapps_dir = path.parent.resolve() resolved = path.resolve() try: resolved.relative_to(webapps_dir) except ValueError as exc: raise ValueError(f"Refusing to remove path outside Tomcat webapps: {path}") from exc if resolved.is_dir(): shutil.rmtree(resolved) else: resolved.unlink() def _safe_extract(handle: tarfile.TarFile, destination: Path) -> None: destination = destination.resolve() for member in handle.getmembers(): target = (destination / member.name).resolve() try: target.relative_to(destination) except ValueError as exc: raise ValueError(f"Archive member escapes destination: {member.name}") from exc handle.extractall(destination) def _load_state(runtime_dir: Path) -> dict[str, Any] | None: state_path = runtime_dir / "server-state.json" if not state_path.exists(): return None value = json.loads(state_path.read_text(encoding="utf-8")) return value if isinstance(value, dict) else None def _is_process_running(pid: int) -> bool: try: os.kill(pid, 0) except ProcessLookupError: return False except PermissionError: return True return True def _terminate_process_group(pid: int, sig: signal.Signals) -> None: try: os.killpg(pid, sig) except ProcessLookupError: return except PermissionError: os.kill(pid, sig) def _read_tail(path: Path, limit: int = 8000) -> str: if not path.exists(): return "" data = path.read_bytes()[-limit:] return data.decode("utf-8", errors="replace") def _java_executable() -> Path: local = _extension_root() / ".local" / "toolchains" / "current-jdk" / "bin" / "java" return local if local.exists() else Path("java") def _maven_executable() -> Path: local = _extension_root() / ".local" / "toolchains" / "current-maven" / "bin" / "mvn" return local if local.exists() else Path("mvn") def _tool_env() -> dict[str, str]: env = os.environ.copy() local_jdk = _extension_root() / ".local" / "toolchains" / "current-jdk" local_maven = _extension_root() / ".local" / "toolchains" / "current-maven" path_parts = [] if local_jdk.exists(): env["JAVA_HOME"] = str(local_jdk) path_parts.append(str(local_jdk / "bin")) if local_maven.exists(): env["MAVEN_HOME"] = str(local_maven) path_parts.append(str(local_maven / "bin")) path_parts.append(env.get("PATH", "")) env["PATH"] = os.pathsep.join(part for part in path_parts if part) return env def _browser_url(port: int, context_path: str) -> str: return f"http://127.0.0.1:{port}{_normalized_context_path(context_path)}/browser" def _normalized_context_path(context_path: str) -> str: value = "/" + context_path.strip("/") return "" if value == "/" else value def _default_runtime_dir() -> Path: return _extension_root() / ".local" / "opencmis-inmemory" def _extension_root() -> Path: return Path(__file__).resolve().parents[1] def _now() -> str: return datetime.now(timezone.utc).isoformat() if __name__ == "__main__": raise SystemExit(main())