Files
open-cmis-tck/adapters/opencmis_console_adapter.py
2026-05-08 01:59:42 +02:00

390 lines
13 KiB
Python

#!/usr/bin/env python3
"""Adapter for Apache Chemistry OpenCMIS TCK ConsoleRunner."""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from open_cmis_tck.normalization import ( # noqa: E402
aggregate_case_result,
parse_text_report,
result_counts,
)
GROUP_CLASSES = {
"repository-type": [
"org.apache.chemistry.opencmis.tck.tests.basics.BasicsTestGroup",
"org.apache.chemistry.opencmis.tck.tests.types.TypesTestGroup",
],
"object-content": [
"org.apache.chemistry.opencmis.tck.tests.crud.CRUDTestGroup",
],
"navigation": [
"org.apache.chemistry.opencmis.tck.tests.filing.FilingTestGroup",
],
"query-acl-versioning": [
"org.apache.chemistry.opencmis.tck.tests.query.QueryTestGroup",
"org.apache.chemistry.opencmis.tck.tests.control.ControlTestGroup",
"org.apache.chemistry.opencmis.tck.tests.versioning.VersioningTestGroup",
],
"relationships": [
"org.apache.chemistry.opencmis.tck.tests.crud.CRUDTestGroup",
],
"change-log": [
"org.apache.chemistry.opencmis.tck.tests.control.ControlTestGroup",
],
"extension-gaps": [],
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--browser-url", required=True)
parser.add_argument("--repository-id", required=True)
parser.add_argument("--check-group", required=True)
parser.add_argument("--artifact-dir", type=Path, required=True)
parser.add_argument("--run-dir", type=Path)
parser.add_argument("--extension-path", type=Path, default=Path(__file__).resolve().parents[1])
parser.add_argument("--credentials-ref", default="")
parser.add_argument("--target-profile-dir", type=Path)
parser.add_argument("--user")
parser.add_argument("--password")
parser.add_argument("--maven-executable", default="mvn")
parser.add_argument("--timeout-seconds", type=int, default=300)
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
result = run_console_adapter(args)
print(json.dumps(result, indent=2, sort_keys=True))
non_error_results = {
"pass",
"skipped",
"warning",
"expected_gap",
"unsupported_by_design",
}
return 0 if result["result"] in non_error_results else 1
def run_console_adapter(args: argparse.Namespace) -> dict[str, Any]:
artifact_dir = args.artifact_dir.resolve()
artifact_dir.mkdir(parents=True, exist_ok=True)
private_session_path = artifact_dir / "session-private.properties"
redacted_session_path = artifact_dir / "session.properties.redacted"
groups_path = artifact_dir / "groups.txt"
invocation_path = artifact_dir / "console-runner-invocation.json"
console_stdout = artifact_dir / "console-runner-stdout.txt"
console_stderr = artifact_dir / "console-runner-stderr.txt"
group_classes = GROUP_CLASSES.get(args.check_group)
if group_classes is None:
return _result(
"blocked",
[f"Unsupported OpenCMIS TCK check group: {args.check_group}."],
args,
[],
args.run_dir,
artifact_dir,
[],
)
credentials = _load_credentials(args)
if credentials["status"] == "blocked":
return _result(
"blocked",
credentials["observations"],
args,
group_classes,
args.run_dir,
artifact_dir,
[],
extra_facts={"blocked_reason": "credentials_unavailable"},
)
_write_session_parameters(private_session_path, redacted_session_path, args, credentials)
_write_groups(groups_path, group_classes)
command = _maven_command(args, private_session_path, groups_path)
invocation = {
"created_at": _now(),
"command": command,
"check_group": args.check_group,
"group_classes": group_classes,
"session_parameters": str(redacted_session_path),
"private_session_parameters": str(private_session_path),
"groups_file": str(groups_path),
"auth_mode": credentials["auth_mode"],
}
invocation_path.write_text(json.dumps(invocation, indent=2, sort_keys=True) + "\n", encoding="utf-8")
artifact_refs = [
str(redacted_session_path.relative_to(artifact_dir)),
str(groups_path.relative_to(artifact_dir)),
str(invocation_path.relative_to(artifact_dir)),
]
if not group_classes:
private_session_path.unlink(missing_ok=True)
return _result(
"skipped",
[f"No OpenCMIS TCK group classes are mapped for {args.check_group}; treated as a known-gap review group."],
args,
group_classes,
args.run_dir,
artifact_dir,
artifact_refs,
)
if args.dry_run:
private_session_path.unlink(missing_ok=True)
return _result(
"skipped",
["Dry run completed; OpenCMIS TCK ConsoleRunner was not invoked."],
args,
group_classes,
args.run_dir,
artifact_dir,
artifact_refs,
)
try:
completed = subprocess.run(
command,
capture_output=True,
text=True,
timeout=args.timeout_seconds,
check=False,
)
finally:
private_session_path.unlink(missing_ok=True)
console_stdout.write_text(completed.stdout, encoding="utf-8")
console_stderr.write_text(completed.stderr, encoding="utf-8")
artifact_refs.extend(
[
str(console_stdout.relative_to(artifact_dir)),
str(console_stderr.relative_to(artifact_dir)),
]
)
cases = parse_text_report(completed.stdout, args.check_group, group_classes)
if cases:
status = aggregate_case_result(result_counts(cases), completed.returncode)
elif completed.returncode == 0:
status = "pass"
else:
status = "infrastructure_error"
return _result(
status,
_console_observations(completed.returncode, args.check_group, cases),
args,
group_classes,
args.run_dir,
artifact_dir,
artifact_refs,
cases=cases,
returncode=completed.returncode,
)
def _write_session_parameters(
private_path: Path,
redacted_path: Path,
args: argparse.Namespace,
credentials: dict[str, Any],
) -> None:
values = {
"org.apache.chemistry.opencmis.binding.spi.type": "browser",
"org.apache.chemistry.opencmis.binding.browser.url": args.browser_url,
"org.apache.chemistry.opencmis.session.repository.id": args.repository_id,
"org.apache.chemistry.opencmis.binding.browser.succinct": "true",
"org.apache.chemistry.opencmis.binding.compression": "true",
"org.apache.chemistry.opencmis.binding.cookies": "true",
}
user = credentials.get("user")
password = credentials.get("password")
if isinstance(user, str) and user:
values["org.apache.chemistry.opencmis.user"] = user
if isinstance(password, str) and password:
values["org.apache.chemistry.opencmis.password"] = password
private_path.write_text(
"\n".join(f"{key}={value}" for key, value in values.items()) + "\n",
encoding="utf-8",
)
redacted_values = dict(values)
if "org.apache.chemistry.opencmis.password" in redacted_values:
redacted_values["org.apache.chemistry.opencmis.password"] = "<redacted>"
redacted_path.write_text(
"\n".join(f"{key}={value}" for key, value in redacted_values.items()) + "\n",
encoding="utf-8",
)
def _load_credentials(args: argparse.Namespace) -> dict[str, Any]:
if args.user is not None or args.password is not None:
return {
"status": "available",
"auth_mode": "argv",
"user": args.user,
"password": args.password,
}
credentials_ref = (args.credentials_ref or "").strip()
if not credentials_ref:
return {"status": "available", "auth_mode": "anonymous"}
if credentials_ref.startswith("env:"):
return _load_env_credentials(credentials_ref)
if credentials_ref.startswith("file:"):
return _load_file_credentials(credentials_ref, args.target_profile_dir)
return {
"status": "blocked",
"auth_mode": "unknown",
"observations": [
"Unsupported credentials_ref. Use env:USER_VAR,PASSWORD_VAR or file:/path/to/credentials.json."
],
}
def _load_env_credentials(credentials_ref: str) -> dict[str, Any]:
names = credentials_ref.removeprefix("env:").split(",", 1)
if len(names) != 2 or not names[0] or not names[1]:
return {
"status": "blocked",
"auth_mode": "env",
"observations": [
"Environment credentials_ref must be env:USER_VAR,PASSWORD_VAR."
],
}
user = os.environ.get(names[0])
password = os.environ.get(names[1])
missing = [name for name, value in [(names[0], user), (names[1], password)] if not value]
if missing:
return {
"status": "blocked",
"auth_mode": "env",
"observations": [
"Missing credential environment variable(s): " + ", ".join(missing) + "."
],
}
return {"status": "available", "auth_mode": "env", "user": user, "password": password}
def _load_file_credentials(
credentials_ref: str,
target_profile_dir: Path | None,
) -> dict[str, Any]:
raw_path = credentials_ref.removeprefix("file:")
path = Path(raw_path).expanduser()
if not path.is_absolute() and target_profile_dir is not None:
path = target_profile_dir / path
if not path.exists():
return {
"status": "blocked",
"auth_mode": "file",
"observations": [f"Credential file does not exist: {path}."],
}
payload = json.loads(path.read_text(encoding="utf-8"))
user = payload.get("user")
password = payload.get("password")
if not isinstance(user, str) or not isinstance(password, str):
return {
"status": "blocked",
"auth_mode": "file",
"observations": [
"Credential file must contain string fields 'user' and 'password'."
],
}
return {"status": "available", "auth_mode": "file", "user": user, "password": password}
def _write_groups(path: Path, group_classes: list[str]) -> None:
path.write_text("\n".join(group_classes) + ("\n" if group_classes else ""), encoding="utf-8")
def _maven_command(args: argparse.Namespace, session_path: Path, groups_path: Path) -> list[str]:
pom_path = args.extension_path / "runtime" / "opencmis-tck" / "pom.xml"
return [
args.maven_executable,
"-q",
"-f",
str(pom_path),
"exec:java",
"-Dexec.mainClass=org.apache.chemistry.opencmis.tck.runner.ConsoleRunner",
f"-Dexec.args={session_path} {groups_path}",
]
def _result(
status: str,
observations: list[str],
args: argparse.Namespace,
group_classes: list[str],
run_dir: Path | None,
artifact_dir: Path,
artifact_refs: list[str],
cases: list[dict[str, Any]] | None = None,
returncode: int | None = None,
extra_facts: dict[str, Any] | None = None,
) -> dict[str, Any]:
cases = cases or []
counts = result_counts(cases)
if not counts:
counts[status] = 1
facts = {
"adapter": "opencmis-console-runner",
"artifact_dir": str(artifact_dir),
"check_group": args.check_group,
"group_classes": group_classes,
"returncode": returncode,
"result_counts": counts,
}
if extra_facts:
facts.update(extra_facts)
return {
"result": status,
"observations": observations,
"tests": cases,
"facts": facts,
"artifact_refs": [
_artifact_ref(artifact_dir / ref, run_dir, artifact_dir)
for ref in artifact_refs
],
}
def _console_observations(returncode: int, check_group: str, cases: list[dict[str, Any]]) -> list[str]:
if cases:
counts = result_counts(cases)
return [
f"OpenCMIS TCK ConsoleRunner exited with {returncode} for {check_group}.",
"Normalized OpenCMIS TextReport case statuses: "
+ ", ".join(f"{key}: {value}" for key, value in counts.items())
+ ".",
]
return [
f"OpenCMIS TCK ConsoleRunner exited with {returncode} for {check_group}, but no TextReport cases were parsed."
]
def _artifact_ref(path: Path, run_dir: Path | None, artifact_dir: Path) -> str:
resolved = path.resolve()
if run_dir is not None:
try:
return str(resolved.relative_to(run_dir.resolve()))
except ValueError:
pass
return str(resolved.relative_to(artifact_dir.resolve()))
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
if __name__ == "__main__":
raise SystemExit(main())