""" SSH-based sandbox: provision an isolated directory on the remote host, rsync the repo into it, and run arbitrary commands there. """ from __future__ import annotations import subprocess import sys import time import uuid from dataclasses import dataclass, field from pathlib import Path @dataclass class Sandbox: host: str repo_path: Path sandbox_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) ssh_user: str = "root" ssh_key: str | None = None # path to private key; None = ssh default base_dir: str = "/tmp/custodian-e2e" @property def remote_dir(self) -> str: return f"{self.base_dir}/{self.sandbox_id}" @property def ssh_target(self) -> str: return f"{self.ssh_user}@{self.host}" # ── low-level helpers ──────────────────────────────────────────────────── def _ssh_args(self) -> list[str]: args = ["ssh", "-o", "StrictHostKeyChecking=no", "-o", "BatchMode=yes", "-o", "ConnectTimeout=15"] if self.ssh_key: args += ["-i", self.ssh_key] args.append(self.ssh_target) return args def run(self, cmd: str, *, timeout: int = 60, stream: bool = False) -> tuple[int, str]: """Run a shell command on the remote host. Returns (exit_code, stdout+stderr).""" full_cmd = self._ssh_args() + [cmd] if stream: proc = subprocess.Popen(full_cmd, text=True, stdout=sys.stdout, stderr=sys.stderr) proc.wait(timeout=timeout) return proc.returncode, "" else: result = subprocess.run(full_cmd, capture_output=True, text=True, timeout=timeout) return result.returncode, result.stdout + result.stderr # ── lifecycle ──────────────────────────────────────────────────────────── def provision(self) -> None: """Create the remote sandbox directory and rsync the repo into it.""" print(f"[sandbox] provisioning {self.remote_dir} on {self.host}") rc, out = self.run(f"mkdir -p {self.remote_dir}") if rc != 0: raise RuntimeError(f"Failed to create remote sandbox dir: {out}") rsync_args = [ "rsync", "-az", "--delete", "--exclude=.git", "--exclude=__pycache__", "--exclude=*.pyc", "--exclude=.venv", "--exclude=node_modules", ] if self.ssh_key: rsync_args += ["-e", f"ssh -i {self.ssh_key} -o StrictHostKeyChecking=no"] else: rsync_args += ["-e", "ssh -o StrictHostKeyChecking=no"] rsync_args += [ f"{self.repo_path}/", f"{self.ssh_target}:{self.remote_dir}/", ] print(f"[sandbox] rsyncing {self.repo_path} → remote:{self.remote_dir}") result = subprocess.run(rsync_args, capture_output=True, text=True, timeout=120) if result.returncode != 0: raise RuntimeError(f"rsync failed: {result.stdout + result.stderr}") def teardown(self) -> None: """Remove the remote sandbox directory.""" print(f"[sandbox] tearing down {self.remote_dir}") rc, out = self.run(f"rm -rf {self.remote_dir}", timeout=30) if rc != 0: print(f"[sandbox] WARNING: teardown failed (manual cleanup may be needed): {out}") def wait_for_url(self, url: str, timeout: int = 120, interval: int = 5) -> bool: """Poll a URL on the remote host until it returns HTTP 2xx or timeout.""" print(f"[sandbox] waiting for {url} (timeout={timeout}s)") deadline = time.time() + timeout while time.time() < deadline: rc, _ = self.run( f"curl -sf --max-time 5 {url} > /dev/null 2>&1", timeout=15, ) if rc == 0: print(f"[sandbox] {url} is up") return True time.sleep(interval) print(f"[sandbox] TIMEOUT waiting for {url}") return False