Files
sand-boxer/src/sandboxer/core/manager.py
tegwick 774bc5ae0a feat: Packer build orchestration (SAND-WP-0012)
Add vm-packer build mode, profile.vm-packer-build, State Hub progress
notes during long provision, docs/runbook, and build mode tests.
2026-06-24 12:56:32 +02:00

509 lines
20 KiB
Python

"""Core sandbox establishment logic — harness-agnostic."""
from __future__ import annotations
from sandboxer.extensions.registry import load_extension, resolve_backend
from sandboxer.lifecycle.expire import (
ExpireCandidate,
apply_expired_state,
find_expire_candidates,
)
from sandboxer.lifecycle.state_hub import emit_lifecycle_event, event_type_for_state
from sandboxer.lifecycle.store import SandboxStore, utcnow
from sandboxer.lifecycle.ttl import expires_at_from, extend_expires_at, resolve_initial_ttl
from sandboxer.models import (
Consumer,
ExpireActionResult,
MeterRecord,
Reachability,
SandboxCreateRequest,
SandboxState,
SandboxStatus,
SnapshotRecord,
)
from sandboxer.payments.billing_export import export_meter_usage
from sandboxer.payments.credits import CreditsStore
from sandboxer.payments.metering import estimate_cost, settle_usage
from sandboxer.placement import resolve_host
from sandboxer.profiles.loader import load_profile
from sandboxer.reachability.enrich import enrich_reachability
from sandboxer.routing.resolver import resolve_extension
from sandboxer.secrets.resolver import resolve_setup_secrets
from sandboxer.snapshots.store import SnapshotStore
from sandboxer.telemetry.export import export_telemetry
from sandboxer.telemetry.introspection import (
build_introspection_report,
collect_host_snapshot,
profile_wants_telemetry,
)
class SandboxManager:
def __init__(
self,
store: SandboxStore | None = None,
credits: CreditsStore | None = None,
snapshots: SnapshotStore | None = None,
) -> None:
self.store = store or SandboxStore()
self.credits = credits or CreditsStore()
self.snapshots = snapshots or SnapshotStore()
@staticmethod
def _handle_from_status(status: SandboxStatus) -> dict[str, str]:
return {
"sandbox_id": status.sandbox_id,
"host": status.host or "",
"remote_dir": status.reachability.remote_dir if status.reachability else "",
"compose_project": status.reachability.compose_project if status.reachability else "",
"compose_file": status.inputs.get("compose_file", ""),
"ssh_user": status.inputs.get("ssh_user", ""),
"compose_cmd": status.inputs.get("compose_cmd", ""),
"ssh_port": status.inputs.get("ssh_port", ""),
"vm_target": status.inputs.get("vm_target", ""),
"vm_host": status.inputs.get("vm_host", ""),
"endpoint": status.inputs.get("endpoint", ""),
"provider_sandbox_id": status.inputs.get("provider_sandbox_id", ""),
"provider": status.inputs.get("provider", ""),
}
def _resolved_host(self, profile, extension, host_override: str | None) -> str:
if extension.capabilities.pricing_model == "metered":
return extension.config.get("provider", "saas")
return resolve_host(profile, override=host_override)
@staticmethod
def _assign_ttl(
status: SandboxStatus,
profile,
*,
request_ttl: str | None,
) -> None:
ttl_str = resolve_initial_ttl(profile, request_ttl)
anchor = status.ready_at or utcnow()
status.ttl = ttl_str
status.expires_at = expires_at_from(anchor, ttl_str)
def create(self, request: SandboxCreateRequest, *, host: str | None = None) -> SandboxStatus:
profile = load_profile(request.profile)
extension = resolve_extension(profile, request.inputs, host_override=host)
backend = resolve_backend(extension)
resolved_host = self._resolved_host(profile, extension, host)
wants_telemetry = profile_wants_telemetry(profile)
base_dir = extension.config.get("base_dir", "/tmp/sandboxer")
quote = estimate_cost(extension, profile, request.inputs)
meter_record: MeterRecord | None = None
if quote:
if not self.credits.can_afford(quote.estimated_usd):
raise RuntimeError(
f"Insufficient credits: need {quote.estimated_usd:.4f} USD, "
f"balance {self.credits.balance():.4f} USD"
)
meter_record = MeterRecord(
pricing_model="metered",
estimate_usd=quote.estimated_usd,
)
now = utcnow()
status = SandboxStatus(
sandbox_id="pending",
profile_id=profile.id,
extension_id=extension.id,
state=SandboxState.REQUESTED,
consumer=request.consumer,
host=resolved_host,
inputs=dict(request.inputs),
meter=meter_record,
created_at=now,
updated_at=now,
)
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
status.state = SandboxState.PROVISIONING
status.updated_at = utcnow()
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
provision_before = None
if wants_telemetry and extension.capabilities.pricing_model != "metered":
provision_before = collect_host_snapshot(resolved_host)
try:
secret_bundle = resolve_setup_secrets(profile)
provision_inputs = dict(request.inputs)
build_mode = (
provision_inputs.get("mode") == "build"
or profile.id == "profile.vm-packer-build"
)
if build_mode:
emit_lifecycle_event(
status,
summary=f"Packer build starting ({profile.id})",
event_type="note",
)
handle = backend.provision(profile, provision_inputs, resolved_host)
if secret_bundle:
handle["_secret_refs"] = secret_bundle
status.sandbox_id = handle["sandbox_id"]
status.inputs["compose_file"] = handle.get("compose_file", "")
status.inputs["ssh_user"] = handle.get("ssh_user", "")
status.inputs["compose_cmd"] = handle.get("compose_cmd", "")
status.inputs["ssh_port"] = handle.get("ssh_port", "")
status.inputs["vm_target"] = handle.get("vm_target", "")
status.inputs["vm_host"] = handle.get("vm_host", "")
status.inputs["endpoint"] = handle.get("endpoint", "")
status.inputs["provider_sandbox_id"] = handle.get("provider_sandbox_id", "")
status.inputs["provider"] = handle.get("provider", "")
reach = backend.wait_ready(handle)
reach = enrich_reachability(reach, profile, handle)
status.reachability = Reachability(**reach)
status.state = SandboxState.READY
status.ready_at = utcnow()
status.updated_at = status.ready_at
self._assign_ttl(status, profile, request_ttl=request.ttl)
if wants_telemetry and provision_before:
provision_after = collect_host_snapshot(resolved_host)
report = build_introspection_report(
host=resolved_host,
sandbox_id=status.sandbox_id,
profile=profile,
provision_before=provision_before,
provision_after=provision_after,
store=self.store,
base_dir=base_dir,
)
status.telemetry = report.model_dump(mode="json")
export_telemetry(report)
self.store.save(status)
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
return status
except Exception as exc:
status.state = SandboxState.FAILED
status.error = str(exc)
status.updated_at = utcnow()
if status.sandbox_id != "pending":
self.store.save(status)
emit_lifecycle_event(
status,
summary=f"Sandbox provision failed: {exc}",
event_type=event_type_for_state(status.state),
)
raise
def get(self, sandbox_id: str) -> SandboxStatus | None:
return self.store.get(sandbox_id)
def reachability_report(self, sandbox_id: str) -> dict:
status = self.store.get(sandbox_id)
if not status:
raise KeyError(f"Sandbox not found: {sandbox_id}")
from sandboxer.reachability.enrich import build_reachability_report
return build_reachability_report(status)
def list(self) -> list[SandboxStatus]:
return sorted(self.store.list_all(), key=lambda s: s.created_at, reverse=True)
def destroy(self, sandbox_id: str) -> SandboxStatus:
status = self.store.get(sandbox_id)
if not status:
raise KeyError(f"Sandbox not found: {sandbox_id}")
if status.state == SandboxState.DESTROYED:
return status
profile = load_profile(status.profile_id)
extension = load_extension(status.extension_id)
backend = resolve_backend(extension)
wants_telemetry = profile_wants_telemetry(profile)
base_dir = extension.config.get("base_dir", "/tmp/sandboxer")
destroy_before = None
if wants_telemetry and status.host and extension.capabilities.pricing_model != "metered":
destroy_before = collect_host_snapshot(status.host)
status.state = SandboxState.DESTROYING
status.updated_at = utcnow()
self.store.save(status)
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
handle = self._handle_from_status(status)
backend.teardown(handle)
status.state = SandboxState.DESTROYED
status.destroyed_at = utcnow()
status.updated_at = status.destroyed_at
settled = settle_usage(status, extension, handle, destroyed_at=status.destroyed_at)
if settled and settled.pricing_model == "metered" and settled.actual_usd:
self.credits.debit(settled.actual_usd)
status.meter = settled
export_meter_usage(status, extension_id=extension.id, meter=settled)
emit_lifecycle_event(
status,
summary=(
f"Sandbox metered: {settled.actual_usd:.4f} USD "
f"({settled.duration_s:.0f}s, ext={extension.id})"
),
event_type="note",
)
if wants_telemetry and destroy_before and status.host:
destroy_after = collect_host_snapshot(status.host)
report = build_introspection_report(
host=status.host,
sandbox_id=status.sandbox_id,
profile=profile,
destroy_before=destroy_before,
destroy_after=destroy_after,
store=self.store,
base_dir=base_dir,
)
status.telemetry = report.model_dump(mode="json")
export_telemetry(report)
self.store.save(status)
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
return status
def recreate(self, sandbox_id: str) -> SandboxStatus:
existing = self.store.get(sandbox_id)
if not existing:
raise KeyError(f"Sandbox not found: {sandbox_id}")
request = SandboxCreateRequest(
profile=existing.profile_id,
inputs=dict(existing.inputs),
consumer=existing.consumer,
ttl=existing.ttl,
)
if existing.state != SandboxState.DESTROYED:
self.destroy(sandbox_id)
return self.create(request, host=existing.host)
def extend_ttl(self, sandbox_id: str, duration: str) -> SandboxStatus:
status = self.store.get(sandbox_id)
if not status:
raise KeyError(f"Sandbox not found: {sandbox_id}")
if status.state not in (SandboxState.READY, SandboxState.ACTIVE):
raise RuntimeError(
f"Cannot extend TTL for sandbox in state {status.state.value}"
)
if not status.expires_at or not status.ready_at:
raise RuntimeError("Sandbox has no expiry metadata")
profile = load_profile(status.profile_id)
new_expires, applied = extend_expires_at(
status.expires_at,
anchor=status.ready_at,
extension=duration,
max_duration=profile.ttl.max,
)
status.expires_at = new_expires
status.ttl = applied
status.updated_at = utcnow()
self.store.save(status)
emit_lifecycle_event(
status,
summary=f"TTL extended by {applied} (expires {new_expires.isoformat()})",
event_type="note",
)
return status
def expire(
self,
*,
apply: bool = False,
now=None,
) -> list[ExpireActionResult]:
candidates = find_expire_candidates(self.store, now=now)
results: list[ExpireActionResult] = []
for candidate in candidates:
if not apply:
results.append(
ExpireActionResult(
sandbox_id=candidate.sandbox_id,
reason=candidate.reason,
action="dry-run",
)
)
continue
try:
status = self.store.get(candidate.sandbox_id)
if not status or status.state not in (
SandboxState.READY,
SandboxState.ACTIVE,
):
continue
status = apply_expired_state(status, now=now)
self.store.save(status)
emit_lifecycle_event(
status,
summary=f"Sandbox expired ({candidate.reason})",
event_type=event_type_for_state(status.state),
)
self.destroy(candidate.sandbox_id)
results.append(
ExpireActionResult(
sandbox_id=candidate.sandbox_id,
reason=candidate.reason,
action="destroyed",
)
)
except Exception as exc:
results.append(
ExpireActionResult(
sandbox_id=candidate.sandbox_id,
reason=candidate.reason,
action="failed",
error=str(exc),
)
)
return results
def list_expire_candidates(self, *, now=None) -> list[ExpireCandidate]:
return find_expire_candidates(self.store, now=now)
def snapshot(self, sandbox_id: str, *, name: str | None = None) -> SnapshotRecord:
status = self.store.get(sandbox_id)
if not status:
raise KeyError(f"Sandbox not found: {sandbox_id}")
if status.state != SandboxState.READY:
raise RuntimeError(
f"Sandbox must be ready to snapshot, got {status.state.value}"
)
extension = load_extension(status.extension_id)
backend = resolve_backend(extension)
if not backend.supports_snapshots():
raise RuntimeError(f"Extension {extension.id} does not support snapshots")
handle = self._handle_from_status(status)
meta = backend.snapshot(handle)
size_raw = meta.get("size_bytes", "")
size_bytes = int(size_raw) if size_raw.isdigit() else None
record = SnapshotRecord(
snapshot_id=meta["snapshot_id"],
sandbox_id=sandbox_id,
profile_id=status.profile_id,
extension_id=status.extension_id,
host=status.host or meta.get("host", ""),
artifact_path=meta.get("artifact_path", ""),
handle=handle,
inputs=dict(status.inputs),
consumer=status.consumer,
name=name,
size_bytes=size_bytes,
created_at=utcnow(),
)
self.snapshots.save(record)
emit_lifecycle_event(
status,
summary=f"Snapshot {record.snapshot_id} created from sandbox {sandbox_id}",
event_type="milestone",
)
return record
def get_snapshot(self, snapshot_id: str) -> SnapshotRecord | None:
return self.snapshots.get(snapshot_id)
def list_snapshots(self, *, sandbox_id: str | None = None) -> list[SnapshotRecord]:
items = self.snapshots.list_all()
if sandbox_id:
items = [s for s in items if s.sandbox_id == sandbox_id]
return sorted(items, key=lambda s: s.created_at, reverse=True)
def restore(
self,
snapshot_id: str,
*,
host: str | None = None,
consumer: Consumer | None = None,
) -> SandboxStatus:
record = self.snapshots.get(snapshot_id)
if not record:
raise KeyError(f"Snapshot not found: {snapshot_id}")
profile = load_profile(record.profile_id)
extension = load_extension(record.extension_id)
backend = resolve_backend(extension)
if not backend.supports_snapshots():
raise RuntimeError(f"Extension {extension.id} does not support restore")
resolved_host = host or record.host
if not resolved_host:
resolved_host = resolve_host(profile)
use_consumer = consumer or record.consumer
if not use_consumer:
raise ValueError("consumer required for restore (not stored on snapshot)")
now = utcnow()
status = SandboxStatus(
sandbox_id="pending",
profile_id=record.profile_id,
extension_id=record.extension_id,
state=SandboxState.REQUESTED,
consumer=use_consumer,
host=resolved_host,
inputs=dict(record.inputs),
created_at=now,
updated_at=now,
)
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
status.state = SandboxState.PROVISIONING
status.updated_at = utcnow()
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
snapshot_meta = {
"snapshot_id": record.snapshot_id,
"artifact_path": record.artifact_path,
"host": record.host,
**record.handle,
}
try:
handle = backend.restore_from_snapshot(
profile, snapshot_meta, record.inputs, resolved_host
)
status.sandbox_id = handle["sandbox_id"]
status.inputs["compose_file"] = handle.get("compose_file", "")
status.inputs["ssh_user"] = handle.get("ssh_user", "")
status.inputs["compose_cmd"] = handle.get("compose_cmd", "")
status.inputs["ssh_port"] = handle.get("ssh_port", "")
status.inputs["vm_target"] = handle.get("vm_target", "")
status.inputs["vm_host"] = handle.get("vm_host", "")
status.inputs["endpoint"] = handle.get("endpoint", "")
status.inputs["restored_from"] = record.snapshot_id
secret_bundle = resolve_setup_secrets(profile)
if secret_bundle:
handle["_secret_refs"] = secret_bundle
reach = backend.wait_ready(handle)
reach = enrich_reachability(reach, profile, handle)
status.reachability = Reachability(**reach)
status.state = SandboxState.READY
status.ready_at = utcnow()
status.updated_at = status.ready_at
self._assign_ttl(status, profile, request_ttl=None)
self.store.save(status)
emit_lifecycle_event(
status,
summary=f"Sandbox restored from snapshot {snapshot_id}",
event_type=event_type_for_state(status.state),
)
return status
except Exception as exc:
status.state = SandboxState.FAILED
status.error = str(exc)
status.updated_at = utcnow()
if status.sandbox_id != "pending":
self.store.save(status)
emit_lifecycle_event(
status,
summary=f"Snapshot restore failed: {exc}",
event_type=event_type_for_state(status.state),
)
raise