Files
the-custodian/e2e-framework/sandbox.py
tegwick d061c777d1 chore(consistency): sync task status from DB [auto]
Updated by fix-consistency on 2026-03-27:
  - update .custodian-brief.md for the-custodian
2026-03-27 00:52:18 +01:00

109 lines
4.2 KiB
Python

"""
SSH-based sandbox: provision an isolated directory on the remote host,
rsync the repo into it, and run arbitrary commands there.
"""
from __future__ import annotations
import subprocess
import sys
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class Sandbox:
host: str
repo_path: Path
sandbox_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
ssh_user: str = "root"
ssh_key: str | None = None # path to private key; None = ssh default
base_dir: str = "/tmp/custodian-e2e"
@property
def remote_dir(self) -> str:
return f"{self.base_dir}/{self.sandbox_id}"
@property
def ssh_target(self) -> str:
return f"{self.ssh_user}@{self.host}"
# ── low-level helpers ────────────────────────────────────────────────────
def _ssh_args(self) -> list[str]:
args = ["ssh", "-o", "StrictHostKeyChecking=no",
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=15"]
if self.ssh_key:
args += ["-i", self.ssh_key]
args.append(self.ssh_target)
return args
def run(self, cmd: str, *, timeout: int = 60, stream: bool = False) -> tuple[int, str]:
"""Run a shell command on the remote host. Returns (exit_code, stdout+stderr)."""
full_cmd = self._ssh_args() + [cmd]
if stream:
proc = subprocess.Popen(full_cmd, text=True,
stdout=sys.stdout, stderr=sys.stderr)
proc.wait(timeout=timeout)
return proc.returncode, ""
else:
result = subprocess.run(full_cmd, capture_output=True, text=True, timeout=timeout)
return result.returncode, result.stdout + result.stderr
# ── lifecycle ────────────────────────────────────────────────────────────
def provision(self) -> None:
"""Create the remote sandbox directory and rsync the repo into it."""
print(f"[sandbox] provisioning {self.remote_dir} on {self.host}")
rc, out = self.run(f"mkdir -p {self.remote_dir}")
if rc != 0:
raise RuntimeError(f"Failed to create remote sandbox dir: {out}")
rsync_args = [
"rsync", "-az", "--delete",
"--exclude=.git",
"--exclude=__pycache__",
"--exclude=*.pyc",
"--exclude=.venv",
"--exclude=node_modules",
]
if self.ssh_key:
rsync_args += ["-e", f"ssh -i {self.ssh_key} -o StrictHostKeyChecking=no"]
else:
rsync_args += ["-e", "ssh -o StrictHostKeyChecking=no"]
rsync_args += [
f"{self.repo_path}/",
f"{self.ssh_target}:{self.remote_dir}/",
]
print(f"[sandbox] rsyncing {self.repo_path} → remote:{self.remote_dir}")
result = subprocess.run(rsync_args, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"rsync failed: {result.stdout + result.stderr}")
def teardown(self) -> None:
"""Remove the remote sandbox directory."""
print(f"[sandbox] tearing down {self.remote_dir}")
rc, out = self.run(f"rm -rf {self.remote_dir}", timeout=30)
if rc != 0:
print(f"[sandbox] WARNING: teardown failed (manual cleanup may be needed): {out}")
def wait_for_url(self, url: str, timeout: int = 120, interval: int = 5) -> bool:
"""Poll a URL on the remote host until it returns HTTP 2xx or timeout."""
print(f"[sandbox] waiting for {url} (timeout={timeout}s)")
deadline = time.time() + timeout
while time.time() < deadline:
rc, _ = self.run(
f"curl -sf --max-time 5 {url} > /dev/null 2>&1",
timeout=15,
)
if rc == 0:
print(f"[sandbox] {url} is up")
return True
time.sleep(interval)
print(f"[sandbox] TIMEOUT waiting for {url}")
return False