Implement SAND-WP-0002 meta-framework foundation (T01–T09)

Add meta-framework spec, pydantic schemas, profile/extension YAML, extension
registry, ext.compose-ssh backend, SandboxManager with State Hub events, CLI
commands, integration docs, capability registry entry, and compose-e2e runbook.
Nine unit tests pass. T10 remote smoke test remains for operator.
This commit is contained in:
2026-06-22 23:27:31 +02:00
parent b0a57cf9d3
commit d6d3155792
28 changed files with 1796 additions and 15 deletions

View File

@@ -1,8 +1,15 @@
"""CLI entry point (bootstrap stub; expanded in SAND-WP-0002)."""
"""sand-boxer CLI — sandbox establishment surface (v0)."""
from __future__ import annotations
import json
from typing import Annotated
import typer
from sandboxer import __version__
from sandboxer.core.manager import SandboxManager
from sandboxer.models import ActorType, Consumer, SandboxCreateRequest
app = typer.Typer(
name="sandboxer",
@@ -22,5 +29,90 @@ def version() -> None:
typer.echo(__version__)
def _parse_inputs(values: list[str]) -> dict[str, str]:
inputs: dict[str, str] = {}
for item in values:
if "=" not in item:
raise typer.BadParameter(f"Expected key=value, got: {item}")
key, val = item.split("=", 1)
inputs[key] = val
return inputs
def _print_status(status: object) -> None:
typer.echo(json.dumps(status, default=str, indent=2))
@app.command("create")
def sandbox_create(
profile: Annotated[str, typer.Option("--profile", help="Profile id")],
input: Annotated[
list[str] | None,
typer.Option("--input", help="Input key=value (repeatable)"),
] = None,
actor: Annotated[str, typer.Option(help="Consumer actor type")] = "adm",
project: Annotated[str, typer.Option(help="Calling project id")] = "sand-boxer",
host: Annotated[str | None, typer.Option(help="Override placement host")] = None,
) -> None:
"""Provision a sandbox from a profile."""
request = SandboxCreateRequest(
profile=profile,
inputs=_parse_inputs(input or []),
consumer=Consumer(actor=ActorType(actor), project=project),
)
manager = SandboxManager()
try:
status = manager.create(request, host=host)
except Exception as exc:
typer.echo(f"Error: {exc}", err=True)
raise typer.Exit(code=1) from exc
_print_status(status.model_dump(mode="json"))
@app.command("get")
def sandbox_get(sandbox_id: str) -> None:
"""Get sandbox status by id."""
status = SandboxManager().get(sandbox_id)
if not status:
typer.echo(f"Sandbox not found: {sandbox_id}", err=True)
raise typer.Exit(code=1)
_print_status(status.model_dump(mode="json"))
@app.command("list")
def sandbox_list(
state: Annotated[str | None, typer.Option(help="Filter by state")] = None,
) -> None:
"""List known sandboxes."""
items = SandboxManager().list()
if state:
items = [s for s in items if s.state.value == state]
_print_status([s.model_dump(mode="json") for s in items])
@app.command("destroy")
def sandbox_destroy(sandbox_id: str) -> None:
"""Destroy a sandbox (idempotent)."""
manager = SandboxManager()
try:
status = manager.destroy(sandbox_id)
except KeyError as exc:
typer.echo(str(exc), err=True)
raise typer.Exit(code=1) from exc
_print_status(status.model_dump(mode="json"))
@app.command("recreate")
def sandbox_recreate(sandbox_id: str) -> None:
"""Destroy and reprovision from stored inputs."""
manager = SandboxManager()
try:
status = manager.recreate(sandbox_id)
except (KeyError, Exception) as exc:
typer.echo(f"Error: {exc}", err=True)
raise typer.Exit(code=1) from exc
_print_status(status.model_dump(mode="json"))
if __name__ == "__main__":
app()

View File

@@ -0,0 +1 @@
"""Core sandbox establishment logic."""

View File

@@ -0,0 +1,122 @@
"""Core sandbox establishment logic — harness-agnostic."""
from __future__ import annotations
from sandboxer.extensions.registry import load_extension, resolve_backend
from sandboxer.lifecycle.state_hub import emit_lifecycle_event, event_type_for_state
from sandboxer.lifecycle.store import SandboxStore, utcnow
from sandboxer.models import (
Reachability,
SandboxCreateRequest,
SandboxState,
SandboxStatus,
)
from sandboxer.placement import resolve_host
from sandboxer.profiles.loader import load_profile
class SandboxManager:
def __init__(self, store: SandboxStore | None = None) -> None:
self.store = store or SandboxStore()
def create(self, request: SandboxCreateRequest, *, host: str | None = None) -> SandboxStatus:
profile = load_profile(request.profile)
extension = load_extension(profile.extension)
backend = resolve_backend(extension)
resolved_host = resolve_host(profile, override=host)
now = utcnow()
status = SandboxStatus(
sandbox_id="pending",
profile_id=profile.id,
extension_id=extension.id,
state=SandboxState.REQUESTED,
consumer=request.consumer,
host=resolved_host,
inputs=dict(request.inputs),
created_at=now,
updated_at=now,
)
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
status.state = SandboxState.PROVISIONING
status.updated_at = utcnow()
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
try:
handle = backend.provision(profile, request.inputs, resolved_host)
status.sandbox_id = handle["sandbox_id"]
status.inputs["compose_file"] = handle.get("compose_file", "")
status.inputs["ssh_user"] = handle.get("ssh_user", "")
reach = backend.wait_ready(handle)
status.reachability = Reachability(**reach)
status.state = SandboxState.READY
status.ready_at = utcnow()
status.updated_at = status.ready_at
self.store.save(status)
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
return status
except Exception as exc:
status.state = SandboxState.FAILED
status.error = str(exc)
status.updated_at = utcnow()
if status.sandbox_id != "pending":
self.store.save(status)
emit_lifecycle_event(
status,
summary=f"Sandbox provision failed: {exc}",
event_type=event_type_for_state(status.state),
)
raise
def get(self, sandbox_id: str) -> SandboxStatus | None:
return self.store.get(sandbox_id)
def list(self) -> list[SandboxStatus]:
return sorted(self.store.list_all(), key=lambda s: s.created_at, reverse=True)
def destroy(self, sandbox_id: str) -> SandboxStatus:
status = self.store.get(sandbox_id)
if not status:
raise KeyError(f"Sandbox not found: {sandbox_id}")
if status.state == SandboxState.DESTROYED:
return status
profile = load_profile(status.profile_id)
extension = load_extension(profile.extension)
backend = resolve_backend(extension)
status.state = SandboxState.DESTROYING
status.updated_at = utcnow()
self.store.save(status)
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
handle = {
"sandbox_id": status.sandbox_id,
"host": status.host or "",
"remote_dir": status.reachability.remote_dir if status.reachability else "",
"compose_project": status.reachability.compose_project if status.reachability else "",
"compose_file": status.inputs.get("compose_file", ""),
"ssh_user": status.inputs.get("ssh_user", ""),
}
backend.teardown(handle)
status.state = SandboxState.DESTROYED
status.destroyed_at = utcnow()
status.updated_at = status.destroyed_at
self.store.save(status)
emit_lifecycle_event(status, event_type=event_type_for_state(status.state))
return status
def recreate(self, sandbox_id: str) -> SandboxStatus:
existing = self.store.get(sandbox_id)
if not existing:
raise KeyError(f"Sandbox not found: {sandbox_id}")
request = SandboxCreateRequest(
profile=existing.profile_id,
inputs=dict(existing.inputs),
consumer=existing.consumer,
)
if existing.state != SandboxState.DESTROYED:
self.destroy(sandbox_id)
return self.create(request, host=existing.host)

View File

@@ -0,0 +1,123 @@
"""ext.compose-ssh — SSH + docker compose provisioning (e2e-framework lineage)."""
from __future__ import annotations
import uuid
from pathlib import Path
from typing import Any
import yaml
from sandboxer.extensions.ssh import SSHConfig
from sandboxer.models import Profile
class ComposeSSHExtension:
"""Provision isolated compose stacks on a remote host via SSH."""
def __init__(self, config: dict[str, Any] | None = None) -> None:
cfg = config or {}
self.base_dir: str = cfg.get("base_dir", "/tmp/sandboxer")
self.ssh_user: str = cfg.get("ssh_user", "root")
self.compose_timeout_s: int = int(cfg.get("compose_timeout_s", 180))
def provision(
self, profile: Profile, inputs: dict[str, str], host: str
) -> dict[str, str]:
repo = inputs.get("repo")
if not repo:
raise ValueError("inputs.repo is required for profile.compose-e2e")
repo_path = Path(repo).expanduser().resolve()
if not repo_path.exists():
raise FileNotFoundError(f"Repo path does not exist: {repo_path}")
sandbox_id = inputs.get("sandbox_id") or str(uuid.uuid4())[:8]
remote_dir = f"{self.base_dir}/{sandbox_id}"
ssh = SSHConfig.from_env(host, user=self.ssh_user)
rc, out = ssh.run(f"mkdir -p {remote_dir}")
if rc != 0:
raise RuntimeError(f"Failed to create remote dir: {out}")
ssh.rsync(repo_path, remote_dir)
compose_file = self._resolve_compose_file(repo_path)
project_name = f"sbx-{profile.id.split('.')[-1]}-{sandbox_id}"
up_cmd = (
f"cd {remote_dir} && "
f"docker compose -p {project_name} -f {compose_file} up -d"
)
rc, out = ssh.run(up_cmd, timeout=self.compose_timeout_s)
if rc != 0:
raise RuntimeError(f"docker compose up failed: {out}")
return {
"sandbox_id": sandbox_id,
"host": host,
"remote_dir": remote_dir,
"compose_project": project_name,
"compose_file": compose_file,
"repo": str(repo_path),
"ssh_user": ssh.user,
}
def wait_ready(self, handle: dict[str, str]) -> dict[str, str]:
"""Confirm compose services are running (no HTTP health polling)."""
ssh = SSHConfig.from_env(handle["host"], user=handle.get("ssh_user", self.ssh_user))
project = handle["compose_project"]
remote_dir = handle["remote_dir"]
compose_file = handle["compose_file"]
cmd = (
f"cd {remote_dir} && "
f"docker compose -p {project} -f {compose_file} ps --status running -q"
)
rc, out = ssh.run(cmd, timeout=60)
if rc != 0 or not out.strip():
raise RuntimeError(f"compose services not running: {out}")
return {
"ssh": ssh.target,
"remote_dir": remote_dir,
"compose_project": project,
"host": handle["host"],
}
def teardown(self, handle: dict[str, str]) -> dict[str, str]:
ssh = SSHConfig.from_env(handle["host"], user=handle.get("ssh_user", self.ssh_user))
project = handle.get("compose_project")
remote_dir = handle.get("remote_dir")
compose_file = handle.get("compose_file")
cleaned_compose = False
if project and remote_dir and compose_file:
down_cmd = (
f"cd {remote_dir} && "
f"docker compose -p {project} -f {compose_file} "
f"down -v --remove-orphans 2>&1 || true"
)
ssh.run(down_cmd, timeout=60)
cleaned_compose = True
cleaned_dir = False
if remote_dir:
rc, _ = ssh.run(f"rm -rf {remote_dir}", timeout=30)
cleaned_dir = rc == 0
return {
"compose_removed": str(cleaned_compose),
"remote_dir_removed": str(cleaned_dir),
"remote_dir": remote_dir or "",
}
def _resolve_compose_file(self, repo_path: Path) -> str:
e2e_yml = repo_path / "e2e" / "e2e.yml"
if e2e_yml.exists():
raw = yaml.safe_load(e2e_yml.read_text())
if raw and raw.get("compose_file"):
return raw["compose_file"]
for candidate in ("docker-compose.dev.yml", "docker-compose.yml", "compose.yml"):
if (repo_path / candidate).exists():
return candidate
raise FileNotFoundError(
f"No compose file found in {repo_path} (expected e2e/e2e.yml or docker-compose*.yml)"
)

View File

@@ -0,0 +1,74 @@
"""Extension discovery, validation, and handler resolution."""
from __future__ import annotations
import importlib
from pathlib import Path
from typing import Protocol
import yaml
from sandboxer.models import Extension, Profile
_REPO_ROOT = Path(__file__).resolve().parents[3]
_EXTENSIONS_DIR = _REPO_ROOT / "extensions"
_REQUIRED_CAPABILITY_FIELDS = ("isolation_levels", "pricing_model")
class ExtensionBackend(Protocol):
def provision(
self, profile: Profile, inputs: dict[str, str], host: str
) -> dict[str, str]: ...
def wait_ready(self, handle: dict[str, str]) -> dict[str, str]: ...
def teardown(self, handle: dict[str, str]) -> dict[str, str]: ...
def extensions_dir() -> Path:
return _EXTENSIONS_DIR
def _validate_extension_caps(ext: Extension) -> None:
caps = ext.capabilities
for field in _REQUIRED_CAPABILITY_FIELDS:
if not getattr(caps, field, None):
raise ValueError(f"Extension {ext.id} missing capability field: {field}")
if not ext.handler:
raise ValueError(f"Extension {ext.id} missing handler")
def load_extension(extension_id: str, *, extensions_root: Path | None = None) -> Extension:
root = extensions_root or _EXTENSIONS_DIR
path = root / f"{extension_id}.yaml"
if not path.exists():
raise FileNotFoundError(f"Extension not found: {extension_id} ({path})")
raw = yaml.safe_load(path.read_text())
ext = Extension.model_validate(raw)
if ext.id != extension_id:
raise ValueError(f"Extension id mismatch: file {extension_id}, content {ext.id}")
_validate_extension_caps(ext)
return ext
def load_all_extensions(*, extensions_root: Path | None = None) -> dict[str, Extension]:
root = extensions_root or _EXTENSIONS_DIR
extensions: dict[str, Extension] = {}
if not root.exists():
return extensions
for path in sorted(root.glob("*.yaml")):
ext = load_extension(path.stem, extensions_root=root)
if ext.id in extensions:
raise ValueError(f"Duplicate extension id: {ext.id}")
extensions[ext.id] = ext
return extensions
def resolve_backend(extension: Extension) -> ExtensionBackend:
module_path, _, attr = extension.handler.partition(":")
if not attr:
raise ValueError(f"Invalid handler for {extension.id}: {extension.handler}")
module = importlib.import_module(module_path)
cls = getattr(module, attr)
return cls(extension.config)

View File

@@ -0,0 +1,71 @@
"""SSH and rsync helpers for self-hosted extensions."""
from __future__ import annotations
import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
@dataclass
class SSHConfig:
host: str
user: str = "root"
key: str | None = None
connect_timeout: int = 15
@property
def target(self) -> str:
return f"{self.user}@{self.host}"
@classmethod
def from_env(cls, host: str, *, user: str | None = None, key: str | None = None) -> SSHConfig:
return cls(
host=host,
user=user or os.environ.get("SANDBOXER_SSH_USER", "root"),
key=key or os.environ.get("SANDBOXER_SSH_KEY"),
)
def ssh_base(self) -> list[str]:
args = [
"ssh",
"-o",
"StrictHostKeyChecking=no",
"-o",
"BatchMode=yes",
"-o",
f"ConnectTimeout={self.connect_timeout}",
]
if self.key:
args += ["-i", self.key]
args.append(self.target)
return args
def run(self, cmd: str, *, timeout: int = 60) -> tuple[int, str]:
result = subprocess.run(
self.ssh_base() + [cmd],
capture_output=True,
text=True,
timeout=timeout,
)
return result.returncode, result.stdout + result.stderr
def rsync(self, local_path: Path, remote_dir: str, *, timeout: int = 120) -> None:
rsync_args = [
"rsync",
"-az",
"--delete",
"--exclude=.git",
"--exclude=__pycache__",
"--exclude=*.pyc",
"--exclude=.venv",
"--exclude=node_modules",
]
ssh_cmd = "ssh -o StrictHostKeyChecking=no"
if self.key:
ssh_cmd = f"ssh -i {self.key} -o StrictHostKeyChecking=no"
rsync_args += ["-e", ssh_cmd, f"{local_path}/", f"{self.target}:{remote_dir}/"]
result = subprocess.run(rsync_args, capture_output=True, text=True, timeout=timeout)
if result.returncode != 0:
raise RuntimeError(f"rsync failed: {result.stdout + result.stderr}")

View File

@@ -0,0 +1,62 @@
"""State Hub lifecycle event emission."""
from __future__ import annotations
import os
from typing import Any
import httpx
from sandboxer.models import SandboxState, SandboxStatus
_DEFAULT_HUB = "http://127.0.0.1:8000"
def hub_url() -> str:
return os.environ.get("STATE_HUB_URL", _DEFAULT_HUB)
def emit_lifecycle_event(
status: SandboxStatus,
*,
summary: str | None = None,
event_type: str = "note",
author: str = "sandboxer",
) -> dict[str, Any] | None:
if os.environ.get("SANDBOXER_NO_STATE_HUB", "").lower() in ("1", "true", "yes"):
return None
payload = {
"event_type": event_type,
"summary": summary or f"Sandbox {status.sandbox_id}{status.state.value}",
"author": author,
"detail": {
"sandbox_id": status.sandbox_id,
"profile_id": status.profile_id,
"extension_id": status.extension_id,
"host": status.host,
"consumer": status.consumer.model_dump(),
"actor_type": status.consumer.actor.value,
"state": status.state.value,
"reachability": status.reachability.model_dump() if status.reachability else None,
"timestamps": {
"created_at": status.created_at.isoformat(),
"updated_at": status.updated_at.isoformat(),
"ready_at": status.ready_at.isoformat() if status.ready_at else None,
"destroyed_at": status.destroyed_at.isoformat() if status.destroyed_at else None,
},
},
}
try:
response = httpx.post(f"{hub_url()}/progress/", json=payload, timeout=10.0)
response.raise_for_status()
return response.json()
except httpx.HTTPError:
return None
def event_type_for_state(state: SandboxState) -> str:
if state in (SandboxState.READY, SandboxState.DESTROYED):
return "milestone"
return "note"

View File

@@ -0,0 +1,52 @@
"""Persistent sandbox status store (JSON file)."""
from __future__ import annotations
import json
import os
from datetime import UTC, datetime
from pathlib import Path
from sandboxer.models import SandboxStatus
def _default_store_path() -> Path:
base = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local" / "share"))
return base / "sandboxer" / "sandboxes.json"
class SandboxStore:
def __init__(self, path: Path | None = None) -> None:
self.path = path or _default_store_path()
self.path.parent.mkdir(parents=True, exist_ok=True)
def _read(self) -> dict[str, dict]:
if not self.path.exists():
return {}
return json.loads(self.path.read_text())
def _write(self, data: dict[str, dict]) -> None:
self.path.write_text(json.dumps(data, indent=2, default=str))
def save(self, status: SandboxStatus) -> None:
data = self._read()
data[status.sandbox_id] = status.model_dump(mode="json")
self._write(data)
def get(self, sandbox_id: str) -> SandboxStatus | None:
raw = self._read().get(sandbox_id)
if not raw:
return None
return SandboxStatus.model_validate(raw)
def list_all(self) -> list[SandboxStatus]:
return [SandboxStatus.model_validate(v) for v in self._read().values()]
def delete(self, sandbox_id: str) -> None:
data = self._read()
data.pop(sandbox_id, None)
self._write(data)
def utcnow() -> datetime:
return datetime.now(UTC)

147
src/sandboxer/models.py Normal file
View File

@@ -0,0 +1,147 @@
"""Pydantic schemas for profiles, extensions, and sandbox lifecycle."""
from __future__ import annotations
from datetime import datetime
from enum import StrEnum
from typing import Any, Literal
from pydantic import BaseModel, Field
class ActorType(StrEnum):
ADM = "adm"
AGT = "agt"
ATM = "atm"
class SandboxState(StrEnum):
REQUESTED = "requested"
PROVISIONING = "provisioning"
READY = "ready"
ACTIVE = "active"
EXPIRED = "expired"
FAILED = "failed"
DESTROYING = "destroying"
DESTROYED = "destroyed"
class RouteStrategy(StrEnum):
PREFER_SELF_HOSTED = "prefer-self-hosted"
LOWEST_COST = "lowest-cost"
LOWEST_LATENCY = "lowest-latency"
EXPLICIT = "explicit"
class Consumer(BaseModel):
actor: ActorType
project: str
session_id: str | None = None
run_id: str | None = None
class IsolationSpec(BaseModel):
level: Literal["container", "microvm", "policy"] = "container"
class NetworkSpec(BaseModel):
default: Literal["deny", "allow"] = "deny"
egress: list[str] = Field(default_factory=list)
class WorkspaceSpec(BaseModel):
mode: Literal["mirror", "remote-canonical"] = "remote-canonical"
access: Literal["none", "ro", "rw"] = "rw"
class TtlSpec(BaseModel):
default: str = "4h"
max: str = "24h"
idle_reap: str | None = None
class ResourceSpec(BaseModel):
cpu: str | None = None
memory_mb: int | None = None
class SetupSpec(BaseModel):
instructions: str = ""
secret_refs: list[str] = Field(default_factory=list)
class PlacementSpec(BaseModel):
prefer: list[str] = Field(default_factory=list)
fallback: list[str] = Field(default_factory=list)
class ReachabilitySpec(BaseModel):
tunnel: str = "ops-bridge"
identity: str = "ops-warden"
class ProfileMetadata(BaseModel):
cost_class: Literal["self-hosted", "saas-metered"] = "self-hosted"
latency_class: str = "standard"
class Profile(BaseModel):
id: str
version: str
extension: str
isolation: IsolationSpec = Field(default_factory=IsolationSpec)
network: NetworkSpec = Field(default_factory=NetworkSpec)
workspace: WorkspaceSpec = Field(default_factory=WorkspaceSpec)
scope_default: Literal["session", "agent", "shared"] = "session"
ttl: TtlSpec = Field(default_factory=TtlSpec)
resources: ResourceSpec = Field(default_factory=ResourceSpec)
setup: SetupSpec = Field(default_factory=SetupSpec)
placement: PlacementSpec = Field(default_factory=PlacementSpec)
reachability: ReachabilitySpec = Field(default_factory=ReachabilitySpec)
metadata: ProfileMetadata = Field(default_factory=ProfileMetadata)
class ExtensionCapabilities(BaseModel):
isolation_levels: list[str] = Field(default_factory=lambda: ["container"])
regions: list[str] = Field(default_factory=list)
persistence: bool = False
pricing_model: Literal["self-hosted", "metered"] = "self-hosted"
class Extension(BaseModel):
id: str
title: str
description: str = ""
handler: str
capabilities: ExtensionCapabilities = Field(default_factory=ExtensionCapabilities)
config: dict[str, Any] = Field(default_factory=dict)
class SandboxCreateRequest(BaseModel):
profile: str
inputs: dict[str, str] = Field(default_factory=dict)
consumer: Consumer
ttl: str | None = None
class Reachability(BaseModel):
ssh: str | None = None
remote_dir: str | None = None
compose_project: str | None = None
host: str | None = None
class SandboxStatus(BaseModel):
sandbox_id: str
profile_id: str
extension_id: str
state: SandboxState
consumer: Consumer
host: str | None = None
reachability: Reachability | None = None
inputs: dict[str, str] = Field(default_factory=dict)
error: str | None = None
created_at: datetime
updated_at: datetime
ready_at: datetime | None = None
destroyed_at: datetime | None = None

View File

@@ -0,0 +1,28 @@
"""Host placement resolution from profile policy and environment."""
from __future__ import annotations
import os
from sandboxer.models import Profile
def resolve_host(profile: Profile, *, override: str | None = None) -> str:
if override:
return override
env_host = os.environ.get("SANDBOXER_HOST")
if env_host:
return env_host
for candidate in [*profile.placement.prefer, *profile.placement.fallback]:
mapped = os.environ.get(f"SANDBOXER_HOST_{candidate.upper()}")
if mapped:
return mapped
if candidate in os.environ:
return os.environ[candidate]
if profile.placement.prefer:
return profile.placement.prefer[0]
if profile.placement.fallback:
return profile.placement.fallback[0]
raise ValueError(
"No host resolved. Set SANDBOXER_HOST or profile placement hosts in environment."
)

View File

@@ -0,0 +1,35 @@
"""Load and validate profile definitions from YAML files."""
from __future__ import annotations
from pathlib import Path
import yaml
from sandboxer.models import Profile
_REPO_ROOT = Path(__file__).resolve().parents[3]
_PROFILES_DIR = _REPO_ROOT / "profiles"
def profiles_dir() -> Path:
return _PROFILES_DIR
def load_profile(profile_id: str, *, profiles_root: Path | None = None) -> Profile:
root = profiles_root or _PROFILES_DIR
path = root / f"{profile_id}.yaml"
if not path.exists():
raise FileNotFoundError(f"Profile not found: {profile_id} ({path})")
raw = yaml.safe_load(path.read_text())
profile = Profile.model_validate(raw)
if profile.id != profile_id:
raise ValueError(f"Profile id mismatch: file {profile_id}, content {profile.id}")
return profile
def list_profile_ids(*, profiles_root: Path | None = None) -> list[str]:
root = profiles_root or _PROFILES_DIR
if not root.exists():
return []
return sorted(p.stem for p in root.glob("*.yaml"))