#!/usr/bin/env python3 """OpenCMIS TCK wrapper boundary. The wrapper owns extension-local orchestration only: dependency checks, optional user-supplied TCK command execution, raw artifact capture, and normalization into the guide-board runner result contract. """ from __future__ import annotations import argparse import json import os import shutil import shlex import subprocess import xml.etree.ElementTree as ET from pathlib import Path from typing import Any def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--context", required=True) args = parser.parse_args() context = _load_context(Path(args.context)) selected_group = context["step"].get("check_group") config = _opencmis_policy(context) dependency_results = { "java": _probe_command(["java", "-version"]), "maven": _probe_command(["mvn", "-version"]), } missing = [ name for name, result in dependency_results.items() if not result["available"] ] if config.get("requires_java_maven", True) and missing: _emit( { "result": "blocked", "observations": [ "OpenCMIS TCK execution skipped because required local dependencies are missing: " + ", ".join(missing) + "." ], "facts": { "blocked_reason": "missing_dependency", "selected_check_group": selected_group, "dependencies": dependency_results, }, "artifact_refs": [], } ) return 0 command_template = _configured_command(config) if command_template is not None: _emit(_run_configured_tck(context, selected_group, dependency_results, command_template)) return 0 _emit( { "result": "blocked", "observations": [ "Java and Maven are available, but the Apache Chemistry OpenCMIS TCK invocation is not configured yet." ], "facts": { "blocked_reason": "tck_invocation_not_configured", "selected_check_group": selected_group, "dependencies": dependency_results, "next_step": "Configure runtime_policy.opencmis_tck.command or OPENCMIS_TCK_COMMAND_JSON with an argv list for the local Apache Chemistry TCK runner.", }, "artifact_refs": [], } ) return 0 def _load_context(path: Path) -> dict[str, Any]: with path.open("r", encoding="utf-8") as handle: value = json.load(handle) if not isinstance(value, dict): raise ValueError("context must be a JSON object") return value def _probe_command(command: list[str]) -> dict[str, Any]: executable = shutil.which(command[0]) if executable is None: return { "available": False, "path": None, "returncode": None, "version_output": None, } completed = subprocess.run( command, capture_output=True, text=True, timeout=10, check=False, ) output = "\n".join( part.strip() for part in [completed.stdout, completed.stderr] if part.strip() ) return { "available": completed.returncode == 0, "path": executable, "returncode": completed.returncode, "version_output": output[:2000], } def _run_configured_tck( context: dict[str, Any], selected_group: str | None, dependency_results: dict[str, dict[str, Any]], command_template: list[str], ) -> dict[str, Any]: run_dir = Path(context["run_dir"]) artifact_dir = run_dir / "artifacts" / "open-cmis-tck" / "tck" / _safe_id(selected_group or "unknown") artifact_dir.mkdir(parents=True, exist_ok=True) command = [_expand_arg(arg, context, selected_group, artifact_dir) for arg in command_template] invocation_ref = _write_json_artifact( run_dir, artifact_dir, "invocation.json", { "selected_check_group": selected_group, "command": command, "target_profile_id": context["target_profile"]["id"], "repository_id": _repository_id(context), "browser_binding_url": _browser_url(context), "credentials_ref": _credentials_ref(context), }, ) try: completed = subprocess.run( command, cwd=Path(context["extension_path"]), capture_output=True, text=True, timeout=_timeout_seconds(context), check=False, ) except FileNotFoundError as exc: return { "result": "blocked", "observations": [ f"OpenCMIS TCK command could not start because {exc.filename!r} was not found." ], "facts": { "blocked_reason": "missing_command", "selected_check_group": selected_group, "dependencies": dependency_results, "command": command, }, "artifact_refs": [invocation_ref], } except subprocess.TimeoutExpired: return { "result": "infrastructure_error", "observations": [ f"OpenCMIS TCK command timed out after {_timeout_seconds(context)} seconds." ], "facts": { "selected_check_group": selected_group, "dependencies": dependency_results, "command": command, }, "artifact_refs": [invocation_ref], } stdout_ref = _write_text_artifact(run_dir, artifact_dir, "stdout.log", completed.stdout) stderr_ref = _write_text_artifact(run_dir, artifact_dir, "stderr.log", completed.stderr) normalized = _normalize_tck_output(completed, artifact_dir, selected_group) normalized["facts"].update( { "selected_check_group": selected_group, "dependencies": dependency_results, "command": command, "returncode": completed.returncode, "browser_binding_url": _browser_url(context), "repository_id": _repository_id(context), } ) normalized_ref = _write_json_artifact( run_dir, artifact_dir, "normalized-runner-result.json", normalized, ) normalized["artifact_refs"].extend([invocation_ref, stdout_ref, stderr_ref, normalized_ref]) return normalized def _normalize_tck_output( completed: subprocess.CompletedProcess[str], artifact_dir: Path, selected_group: str | None, ) -> dict[str, Any]: parsed_stdout = _parse_json(completed.stdout) if isinstance(parsed_stdout, dict): return _normalize_json_result(parsed_stdout, completed.returncode, selected_group) junit_files = sorted(artifact_dir.glob("*.xml")) if junit_files: return _normalize_junit_result(junit_files[0], completed.returncode, selected_group) if completed.returncode == 0: return { "result": "pass", "observations": [ "OpenCMIS TCK command completed successfully, but no structured result payload was found." ], "facts": { "normalizer": "exit-code", "result_counts": {"pass": 1}, }, "artifact_refs": [], } return { "result": "fail", "observations": [ "OpenCMIS TCK command exited with a non-zero status and no structured result payload was found." ], "facts": { "normalizer": "exit-code", "result_counts": {"fail": 1}, }, "artifact_refs": [], } def _normalize_json_result( payload: dict[str, Any], returncode: int, selected_group: str | None, ) -> dict[str, Any]: artifact_refs = _artifact_refs_from_payload(payload) source_facts = payload.get("facts", {}) if not isinstance(source_facts, dict): source_facts = {} cases = _json_cases(payload) if cases: counts: dict[str, int] = {} normalized_cases = [] for case in cases: status = _normalize_case_status(str(case.get("status", "unknown"))) counts[status] = counts.get(status, 0) + 1 normalized_cases.append( { "id": str(case.get("id", case.get("name", "unnamed"))), "status": status, "message": str(case.get("message", case.get("reason", ""))), } ) return { "result": _aggregate_result(counts, returncode), "observations": [ f"OpenCMIS TCK group {selected_group!r} produced {sum(counts.values())} normalized case result(s)." ], "facts": { **source_facts, "normalizer": "json-cases", "result_counts": counts, "cases": normalized_cases[:200], }, "artifact_refs": artifact_refs, } result = _normalize_case_status(str(payload.get("result", "unknown"))) if returncode != 0 and result in {"pass", "warning", "skipped"}: result = "infrastructure_error" return { "result": result, "observations": _observations_from_payload(payload, selected_group), "facts": { **source_facts, "normalizer": "json-runner-result", "result_counts": {result: 1}, "payload": payload, }, "artifact_refs": artifact_refs, } def _normalize_junit_result( path: Path, returncode: int, selected_group: str | None, ) -> dict[str, Any]: tree = ET.parse(path) root = tree.getroot() suites = [root] if root.tag == "testsuite" else list(root.findall("testsuite")) total = sum(int(suite.get("tests", "0")) for suite in suites) failures = sum(int(suite.get("failures", "0")) for suite in suites) errors = sum(int(suite.get("errors", "0")) for suite in suites) skipped = sum(int(suite.get("skipped", "0")) for suite in suites) passed = max(0, total - failures - errors - skipped) counts = { "pass": passed, "fail": failures + errors, "skipped": skipped, } counts = {key: value for key, value in counts.items() if value} return { "result": _aggregate_result(counts, returncode), "observations": [ f"OpenCMIS TCK group {selected_group!r} produced JUnit-style XML results." ], "facts": { "normalizer": "junit-xml", "result_counts": counts, "junit_xml": str(path), }, "artifact_refs": [], } def _json_cases(payload: dict[str, Any]) -> list[dict[str, Any]]: for key in ("tests", "cases", "results"): value = payload.get(key) if isinstance(value, list) and all(isinstance(item, dict) for item in value): return value return [] def _artifact_refs_from_payload(payload: dict[str, Any]) -> list[str]: refs = payload.get("artifact_refs", []) if not isinstance(refs, list): return [] return [ref for ref in refs if isinstance(ref, str) and ref] def _aggregate_result(counts: dict[str, int], returncode: int) -> str: if counts.get("infrastructure_error"): return "infrastructure_error" if counts.get("fail"): return "fail" if counts.get("pass"): return "pass" if counts.get("expected_gap"): return "expected_gap" if counts.get("unsupported_by_design"): return "unsupported_by_design" if counts.get("skipped"): return "skipped" return "infrastructure_error" if returncode else "unknown" def _normalize_case_status(value: str) -> str: normalized = value.strip().lower().replace("-", "_").replace(" ", "_") if normalized in {"ok", "success", "passed"}: return "pass" if normalized in {"failure", "failed", "error"}: return "fail" if normalized in {"skip", "skipped"}: return "skipped" if normalized in {"expected_skip", "expected_gap"}: return "expected_gap" if normalized in {"unsupported", "unsupported_by_design"}: return "unsupported_by_design" if normalized in {"infra", "infrastructure_error"}: return "infrastructure_error" if normalized in { "pass", "fail", "warning", "manual", "not_applicable", "waiver_applied", "blocked", "unknown", }: return normalized return "unknown" def _observations_from_payload(payload: dict[str, Any], selected_group: str | None) -> list[str]: observations = payload.get("observations") if isinstance(observations, list): return [str(item) for item in observations] message = payload.get("message") if isinstance(message, str) and message: return [message] return [f"OpenCMIS TCK group {selected_group!r} returned a structured result."] def _opencmis_policy(context: dict[str, Any]) -> dict[str, Any]: policy = context["assessment_profile"].get("runtime_policy", {}).get("opencmis_tck", {}) return policy if isinstance(policy, dict) else {} def _configured_command(config: dict[str, Any]) -> list[str] | None: command = config.get("command") if isinstance(command, list) and all(isinstance(item, str) and item for item in command): return command env_json = os.environ.get("OPENCMIS_TCK_COMMAND_JSON") if env_json: parsed = json.loads(env_json) if isinstance(parsed, list) and all(isinstance(item, str) and item for item in parsed): return parsed raise ValueError("OPENCMIS_TCK_COMMAND_JSON must be a JSON string array") env_command = os.environ.get("OPENCMIS_TCK_COMMAND") if env_command: return shlex.split(env_command) return None def _expand_arg( value: str, context: dict[str, Any], selected_group: str | None, artifact_dir: Path, ) -> str: return ( value.replace("{run_dir}", context["run_dir"]) .replace("{artifact_dir}", str(artifact_dir)) .replace("{extension_path}", context["extension_path"]) .replace("{browser_url}", _browser_url(context) or "") .replace("{repository_id}", _repository_id(context) or "") .replace("{credentials_ref}", _credentials_ref(context) or "") .replace("{target_profile_dir}", _target_profile_dir(context) or "") .replace("{check_group}", selected_group or "") .replace("{target_id}", context["target_profile"]["id"]) .replace("{timeout_seconds}", str(int(_timeout_seconds(context)))) ) def _browser_url(context: dict[str, Any]) -> str | None: for endpoint in context["target_profile"].get("endpoints", []): if endpoint.get("binding") == "cmis-browser": return endpoint.get("url") return None def _repository_id(context: dict[str, Any]) -> str | None: value = _opencmis_policy(context).get("repository_id") return value if isinstance(value, str) else None def _credentials_ref(context: dict[str, Any]) -> str | None: value = context["target_profile"].get("credentials_ref") return value if isinstance(value, str) else None def _target_profile_dir(context: dict[str, Any]) -> str | None: path = context["plan"].get("profile_paths", {}).get("target_profile_path") if not isinstance(path, str) or not path: return None return str(Path(path).resolve().parent) def _timeout_seconds(context: dict[str, Any]) -> float: runtime_policy = context["assessment_profile"].get("runtime_policy", {}) opencmis_policy = _opencmis_policy(context) configured = opencmis_policy.get("timeout_seconds", runtime_policy.get("timeout_seconds", 300)) if not isinstance(configured, (int, float)): return 300.0 return max(1.0, float(configured)) def _write_text_artifact(run_dir: Path, artifact_dir: Path, name: str, value: str) -> str: path = artifact_dir / name path.write_text(value, encoding="utf-8") return str(path.relative_to(run_dir)) def _write_json_artifact( run_dir: Path, artifact_dir: Path, name: str, value: dict[str, Any], ) -> str: path = artifact_dir / name path.write_text(json.dumps(value, indent=2, sort_keys=True) + "\n", encoding="utf-8") return str(path.relative_to(run_dir)) def _parse_json(value: str) -> Any: try: return json.loads(value) except json.JSONDecodeError: return None def _safe_id(value: str) -> str: return "".join(char if char.isalnum() or char in {"-", "_"} else "_" for char in value) def _emit(value: dict[str, Any]) -> None: print(json.dumps(value, indent=2, sort_keys=True)) if __name__ == "__main__": raise SystemExit(main())