Implement SAND-WP-0005: extension SDK and ext.vm-packer

Add SandboxExtension base class, extension SDK docs, vm-packer attach mode
for build-machines VMs, profile.vm-haskell-build, SSH port support, tests,
and migration docs.
This commit is contained in:
2026-06-24 01:47:07 +02:00
parent c8126672ee
commit cec0fc6348
20 changed files with 679 additions and 16 deletions

View File

@@ -0,0 +1,125 @@
"""ext.vm-packer — attach to pre-built VMs (build-machines lineage).
v0 supports **attach** mode only: connect via SSH to an existing VM (tunnel alias,
localhost:port, or direct host). Creates an isolated workspace directory; teardown
removes the workspace, not the VM.
Full Packer build / OVA import orchestration is deferred — operators still build
images via the-custodian/infra/build-machines/ workflows.
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Any
from sandboxer.extensions.base import SandboxExtension
from sandboxer.extensions.ssh import SSHConfig
from sandboxer.models import Profile
class VMPackerExtension(SandboxExtension):
"""Attach sandbox workspace on a pre-provisioned VM."""
def __init__(self, config: dict[str, Any] | None = None) -> None:
super().__init__(config)
self.workspace_base: str = self.config.get("workspace_base", "/build")
self.default_user: str | None = self.config.get("ssh_user")
self.ready_timeout_s: int = int(self.config.get("ready_timeout_s", 30))
def _ssh_from_handle(self, handle: dict[str, str]) -> SSHConfig:
port_raw = handle.get("ssh_port") or os.environ.get("SANDBOXER_VM_SSH_PORT")
port = int(port_raw) if port_raw else None
user = handle.get("ssh_user") or self.default_user
host = handle.get("vm_host") or handle.get("host") or "localhost"
return SSHConfig(
host=host,
user=user,
key=os.environ.get("SANDBOXER_SSH_KEY"),
port=port,
)
def _resolve_vm_host(
self, inputs: dict[str, str], placement_host: str
) -> tuple[str, int | None]:
"""Return SSH host and optional port for attach mode."""
if inputs.get("ssh_port"):
return inputs.get("vm_host") or placement_host or "localhost", int(inputs["ssh_port"])
if inputs.get("tunnel_port"):
return "localhost", int(inputs["tunnel_port"])
env_port = os.environ.get("SANDBOXER_VM_TUNNEL_PORT")
if env_port and (placement_host in ("localhost", "127.0.0.1", "workstation")):
return "localhost", int(env_port)
return inputs.get("vm_host") or placement_host, None
def provision(
self, profile: Profile, inputs: dict[str, str], host: str
) -> dict[str, str]:
vm_target = inputs.get("vm") or inputs.get("ssh_target")
if not vm_target:
raise ValueError("inputs.vm or inputs.ssh_target is required for ext.vm-packer")
sandbox_id = self.new_sandbox_id(inputs)
remote_dir = inputs.get("workspace_dir") or f"{self.workspace_base}/sbx-{sandbox_id}"
vm_host, ssh_port = self._resolve_vm_host(inputs, host)
ssh_user = inputs.get("ssh_user") or self.default_user
if "@" in vm_target or "." not in vm_target:
ssh = SSHConfig(host=vm_target, user=ssh_user, port=ssh_port)
connect_host = vm_target
else:
ssh = SSHConfig(host=vm_host, user=ssh_user, port=ssh_port)
connect_host = vm_host
rc, out = ssh.run(f"mkdir -p {remote_dir}")
if rc != 0:
raise RuntimeError(f"Failed to create workspace on VM: {out}")
repo_path_str = ""
repo = inputs.get("repo")
if repo:
repo_path = Path(repo).expanduser().resolve()
if not repo_path.exists():
raise FileNotFoundError(f"Repo path does not exist: {repo_path}")
ssh.rsync(repo_path, remote_dir)
repo_path_str = str(repo_path)
return {
"sandbox_id": sandbox_id,
"host": host,
"vm_host": connect_host,
"vm_target": vm_target,
"remote_dir": remote_dir,
"ssh_user": ssh.user or "",
"ssh_port": str(ssh.port) if ssh.port else "",
"mode": inputs.get("mode", "attach"),
"repo": repo_path_str,
}
def wait_ready(self, handle: dict[str, str]) -> dict[str, str]:
ssh = self._ssh_from_handle(handle)
remote_dir = handle["remote_dir"]
cmd = f"test -d {remote_dir} && echo ready"
rc, out = ssh.run(cmd, timeout=self.ready_timeout_s)
if rc != 0 or "ready" not in out:
raise RuntimeError(f"VM workspace not ready: {out}")
return {
"ssh": ssh.target,
"remote_dir": remote_dir,
"host": handle.get("host"),
}
def teardown(self, handle: dict[str, str]) -> dict[str, str]:
remote_dir = handle.get("remote_dir")
cleaned_dir = False
if remote_dir:
ssh = self._ssh_from_handle(handle)
rc, _ = ssh.run(f"rm -rf {remote_dir}", timeout=60)
cleaned_dir = rc == 0
return {
"workspace_removed": str(cleaned_dir),
"remote_dir": remote_dir or "",
"vm_preserved": "true",
}