"""Runner bridge for extension-provided checks.""" from __future__ import annotations import importlib.util import json import os import subprocess from pathlib import Path from types import ModuleType from typing import Any, Callable from guide_board.errors import ValidationError from guide_board.io import load_json, write_json RunnerCallable = Callable[[dict[str, Any]], dict[str, Any]] def run_step( root: Path, run_dir: Path, run_id: str, plan: dict[str, Any], step: dict[str, Any], ) -> dict[str, Any]: runner_ref = step.get("runner_ref") if runner_ref is None: return _no_runner_result(step) extension = _extension_snapshot(plan, step["extension_id"]) extension_path = _snapshot_path(root, extension) manifest = load_json(extension_path / "extension.json") entrypoint = _runner_entrypoint(manifest, runner_ref) if entrypoint["kind"] == "python_module": return _run_python_module(root, run_dir, run_id, plan, step, extension_path, entrypoint) if entrypoint["kind"] == "external": return { "result": "blocked", "observations": [ f"Runner {runner_ref!r} is declared as an external runner and is not implemented by the core." ], "facts": { "runner_ref": runner_ref, "runner_kind": "external", }, "artifact_refs": [], } if entrypoint["kind"] == "command": return _run_command(root, run_dir, run_id, plan, step, extension_path, entrypoint) raise ValidationError(f"{runner_ref}: unsupported runner kind {entrypoint['kind']!r}") def _no_runner_result(step: dict[str, Any]) -> dict[str, Any]: result = "manual" if step["kind"] == "check_group" else "skipped" return { "result": result, "observations": [ "No runner is configured for this step in the baseline core." ], "facts": { "runner_ref": None, "runner_kind": None, }, "artifact_refs": [], } def _run_python_module( root: Path, run_dir: Path, run_id: str, plan: dict[str, Any], step: dict[str, Any], extension_path: Path, entrypoint: dict[str, Any], ) -> dict[str, Any]: module_path = entrypoint.get("module_path") callable_name = entrypoint.get("callable") if not module_path or not callable_name: raise ValidationError(f"{entrypoint['id']}: python_module runners need module_path and callable") module_file = (extension_path / module_path).resolve() try: module_file.relative_to(extension_path.resolve()) except ValueError as exc: raise ValidationError( f"{entrypoint['id']}: module_path must stay inside the extension directory" ) from exc module = _load_module(module_file, entrypoint["id"]) runner = getattr(module, callable_name, None) if not callable(runner): raise ValidationError(f"{entrypoint['id']}: callable {callable_name!r} was not found") context = { "root": str(root), "run_dir": str(run_dir), "run_id": run_id, "plan": plan, "step": step, "target_profile": plan["target_profile_snapshot"], "assessment_profile": plan["assessment_profile_snapshot"], "extension_path": str(extension_path), "runner": entrypoint, } try: result = runner(context) except Exception as exc: # noqa: BLE001 - extension failures become evidence. return { "result": "infrastructure_error", "observations": [ f"Runner {entrypoint['id']!r} failed before producing evidence: {exc}" ], "facts": { "runner_ref": entrypoint["id"], "runner_kind": "python_module", "error_type": type(exc).__name__, }, "artifact_refs": [], } if not isinstance(result, dict): raise ValidationError(f"{entrypoint['id']}: runner must return an object") return { "result": result.get("result", "unknown"), "observations": result.get("observations", []), "facts": result.get("facts", {}), "artifact_refs": result.get("artifact_refs", []), } def _run_command( root: Path, run_dir: Path, run_id: str, plan: dict[str, Any], step: dict[str, Any], extension_path: Path, entrypoint: dict[str, Any], ) -> dict[str, Any]: command_template = entrypoint.get("command") if not isinstance(command_template, list) or not command_template: raise ValidationError(f"{entrypoint['id']}: command runners need a non-empty command") context_path = run_dir / "artifacts" / "runner-contexts" / f"{_safe_id(step['id'])}.json" context = { "root": str(root), "run_dir": str(run_dir), "run_id": run_id, "plan": plan, "step": step, "target_profile": plan["target_profile_snapshot"], "assessment_profile": plan["assessment_profile_snapshot"], "extension_path": str(extension_path), "runner": entrypoint, } write_json(context_path, context) command = [ _expand_command_arg(arg, root, run_dir, extension_path, context_path) for arg in command_template ] timeout = _timeout_seconds(plan) env = os.environ.copy() src_path = str(root / "src") env["PYTHONPATH"] = ( src_path if not env.get("PYTHONPATH") else f"{src_path}{os.pathsep}{env['PYTHONPATH']}" ) try: completed = subprocess.run( command, cwd=extension_path, capture_output=True, text=True, timeout=timeout, check=False, env=env, ) except FileNotFoundError as exc: return { "result": "blocked", "observations": [ f"Command runner {entrypoint['id']!r} could not start: {exc.filename} was not found." ], "facts": { "runner_ref": entrypoint["id"], "runner_kind": "command", "blocked_reason": "missing_command", "command": command, }, "artifact_refs": [str(context_path.relative_to(run_dir))], } except subprocess.TimeoutExpired: return { "result": "infrastructure_error", "observations": [ f"Command runner {entrypoint['id']!r} timed out after {timeout} seconds." ], "facts": { "runner_ref": entrypoint["id"], "runner_kind": "command", "timeout_seconds": timeout, "command": command, }, "artifact_refs": [str(context_path.relative_to(run_dir))], } parsed = _parse_runner_stdout(completed.stdout) if parsed is None: result = "infrastructure_error" if completed.returncode else "unknown" return { "result": result, "observations": [ f"Command runner {entrypoint['id']!r} did not return a JSON result on stdout." ], "facts": { "runner_ref": entrypoint["id"], "runner_kind": "command", "returncode": completed.returncode, "stdout": completed.stdout[-4000:], "stderr": completed.stderr[-4000:], "command": command, }, "artifact_refs": [str(context_path.relative_to(run_dir))], } facts = parsed.get("facts", {}) if not isinstance(facts, dict): facts = {} facts.update( { "runner_ref": entrypoint["id"], "runner_kind": "command", "returncode": completed.returncode, "stderr": completed.stderr[-4000:], } ) observations = parsed.get("observations", []) if not isinstance(observations, list): observations = [str(observations)] artifact_refs = parsed.get("artifact_refs", []) if not isinstance(artifact_refs, list): artifact_refs = [] artifact_refs.append(str(context_path.relative_to(run_dir))) result = parsed.get("result", "unknown") if completed.returncode != 0 and result in {"pass", "warning", "manual", "skipped"}: result = "infrastructure_error" observations.append( f"Command runner {entrypoint['id']!r} exited with {completed.returncode}." ) return { "result": result, "observations": observations, "facts": facts, "artifact_refs": artifact_refs, } def _load_module(path: Path, runner_id: str) -> ModuleType: if not path.exists(): raise ValidationError(f"{runner_id}: module not found: {path}") module_name = f"_guide_board_runner_{runner_id.replace('-', '_')}" spec = importlib.util.spec_from_file_location(module_name, path) if spec is None or spec.loader is None: raise ValidationError(f"{runner_id}: unable to load module from {path}") module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module def _extension_snapshot(plan: dict[str, Any], extension_id: str) -> dict[str, Any]: for extension in plan["extension_snapshots"]: if extension["id"] == extension_id: return extension raise ValidationError(f"step references unknown extension {extension_id!r}") def _snapshot_path(root: Path, extension: dict[str, Any]) -> Path: path = Path(extension["path"]) return path if path.is_absolute() else root / path def _runner_entrypoint(manifest: dict[str, Any], runner_ref: str) -> dict[str, Any]: for entrypoint in manifest.get("runner_entrypoints", []): if entrypoint["id"] == runner_ref: return entrypoint raise ValidationError(f"{manifest['id']}: runner {runner_ref!r} is not declared") def _expand_command_arg( arg: str, root: Path, run_dir: Path, extension_path: Path, context_path: Path, ) -> str: return ( arg.replace("{root}", str(root)) .replace("{run_dir}", str(run_dir)) .replace("{extension_path}", str(extension_path)) .replace("{context_json}", str(context_path)) ) def _timeout_seconds(plan: dict[str, Any]) -> float: runtime_policy = plan.get("runtime_policy", {}) timeout = runtime_policy.get("timeout_seconds", 300) if not isinstance(timeout, (int, float)): return 300.0 return max(1.0, float(timeout)) def _parse_runner_stdout(stdout: str) -> dict[str, Any] | None: stripped = stdout.strip() if not stripped: return None try: parsed = json.loads(stripped) except json.JSONDecodeError: return None if not isinstance(parsed, dict): return None return parsed def _safe_id(value: str) -> str: return "".join(char if char.isalnum() or char in {"-", "_"} else "_" for char in value)