chore(consistency): sync task status from DB [auto]

Updated by fix-consistency on 2026-03-27:
  - update .custodian-brief.md for the-custodian
This commit is contained in:
2026-03-27 00:48:28 +01:00
parent 276196028a
commit d061c777d1
11 changed files with 760 additions and 3 deletions

97
e2e-framework/RUNBOOK.md Normal file
View File

@@ -0,0 +1,97 @@
# E2E Sandbox Framework — Runbook
## Prerequisites
**Workstation:**
- `ssh` + `rsync` available
- `python3` + `pyyaml` available (or `uv run`)
- State-hub running on `:8000` (for result reporting)
**Sandbox host (railiance01):**
- SSH key access
- Docker + docker compose plugin installed
- Sufficient disk for images (~4 GB for activity-core stack)
## First run
```bash
# Set sandbox host (once, or add to ~/.bashrc / .env)
export RAILIANCE01_HOST=<ip-or-alias> # e.g. 92.205.130.254
export RAILIANCE01_USER=root # optional, default=root
export RAILIANCE01_KEY=~/.ssh/id_rsa # optional, uses ssh default otherwise
# From the-custodian:
make e2e REPO=activity-core
```
Output will show each step: rsync → compose up → health wait → tests → compose down.
Exit code is 0 (all passed) or 1 (any failure).
## Options
```bash
# Keep sandbox alive after run (for debugging)
make e2e REPO=activity-core KEEP=1
# Override host without env var
make e2e REPO=activity-core HOST=192.168.1.50
# Attach result to a specific state-hub workstream
make e2e REPO=activity-core WORKSTREAM_ID=<uuid>
# Skip posting to state-hub
cd the-custodian && python3 -m e2e_framework ~/activity-core --no-report
```
## Adding a new repo
1. Create `<repo>/e2e/e2e.yml`:
```yaml
name: <repo-slug>
compose_file: docker-compose.dev.yml # or e2e/compose.yml
health_checks:
- name: <service>
url: http://localhost:<port>
timeout: 120
test_command: uv run python -m pytest e2e/tests/ -v
timeout: 300
cleanup: always
```
2. Add `<repo>/e2e/tests/test_*.py` — test scripts that exit 0 on success.
3. Run: `make e2e REPO=<repo>`
## Troubleshooting
**Sandbox not cleaned up:**
```bash
ssh root@$RAILIANCE01_HOST 'ls /tmp/custodian-e2e/'
ssh root@$RAILIANCE01_HOST 'docker compose ls'
# Manually clean:
ssh root@$RAILIANCE01_HOST 'docker compose -p e2e-activity-core-<id> down -v; rm -rf /tmp/custodian-e2e/<id>'
```
**Temporal startup slow (>2 min):**
Elasticsearch takes 6090 seconds. The health check waits up to 180s.
If it times out, check:
```bash
ssh root@$RAILIANCE01_HOST 'docker logs temporal-elasticsearch | tail -20'
```
**Worker fails to start:**
Check that `uv` is installed on the sandbox host:
```bash
ssh root@$RAILIANCE01_HOST 'which uv || curl -LsSf https://astral.sh/uv/install.sh | sh'
```
**rsync excluded paths:**
`.git`, `__pycache__`, `*.pyc`, `.venv`, `node_modules` are excluded.
This means `uv sync` runs on the remote after rsync (handled by `uv run`).
## Architecture notes
- Sandbox isolation: docker compose project name `e2e-{repo}-{sandbox_id}`
- Sandbox dir: `/tmp/custodian-e2e/{sandbox_id}/`
- No port conflicts: each sandbox uses its own docker network
- Parallel runs of the same repo are safe (different sandbox_id)

View File

@@ -0,0 +1 @@
"""Custodian cross-repo e2e sandbox framework."""

View File

@@ -0,0 +1,2 @@
from .cli import main
main()

77
e2e-framework/cli.py Normal file
View File

@@ -0,0 +1,77 @@
"""
Entry point: python -m e2e_framework <repo-path> [options]
Usage:
python -m e2e_framework ~/activity-core
python -m e2e_framework ~/activity-core --host 92.205.130.254
python -m e2e_framework ~/activity-core --host railiance01 --keep
make e2e REPO=activity-core (from the-custodian/)
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
from .runner import run_e2e
from .reporter import report
def main() -> None:
parser = argparse.ArgumentParser(description="Run e2e tests in a remote sandbox")
parser.add_argument("repo_path", help="Path to the repo containing e2e/e2e.yml")
parser.add_argument(
"--host",
default=os.environ.get("RAILIANCE01_HOST", ""),
help="Sandbox host (SSH alias or IP). Env: RAILIANCE01_HOST",
)
parser.add_argument(
"--user",
default=os.environ.get("RAILIANCE01_USER", "root"),
help="SSH user (default: root). Env: RAILIANCE01_USER",
)
parser.add_argument(
"--key",
default=os.environ.get("RAILIANCE01_KEY"),
help="Path to SSH private key. Env: RAILIANCE01_KEY",
)
parser.add_argument(
"--keep",
action="store_true",
help="Keep sandbox after run (skip compose down + dir removal)",
)
parser.add_argument(
"--workstream-id",
default=None,
help="State-hub workstream ID to attach the progress event to",
)
parser.add_argument(
"--no-report",
action="store_true",
help="Skip posting results to state-hub",
)
args = parser.parse_args()
if not args.host:
print("ERROR: sandbox host required. Set RAILIANCE01_HOST or pass --host.")
sys.exit(1)
repo_path = Path(args.repo_path).expanduser().resolve()
if not repo_path.exists():
print(f"ERROR: repo path does not exist: {repo_path}")
sys.exit(1)
result = run_e2e(
repo_path=repo_path,
host=args.host,
ssh_user=args.user,
ssh_key=args.key,
keep=args.keep,
)
if not args.no_report:
report(result, workstream_id=args.workstream_id)
sys.exit(0 if result.passed else 1)

50
e2e-framework/reporter.py Normal file
View File

@@ -0,0 +1,50 @@
"""
Push e2e run results to the state-hub as a progress event.
"""
from __future__ import annotations
import json
import urllib.request
import urllib.error
from .runner import RunResult
STATE_HUB_URL = "http://127.0.0.1:8000"
def report(result: RunResult, workstream_id: str | None = None) -> bool:
"""POST result to state-hub add_progress_event. Returns True on success."""
body = {
"event_type": "e2e_result",
"repo": result.repo,
"sandbox_id": result.sandbox_id,
"passed": result.passed,
"exit_code": result.exit_code,
"duration_s": round(result.duration_s, 1),
}
if result.error:
body["error"] = result.error
payload = {
"summary": (
f"E2E {'PASSED' if result.passed else 'FAILED'}: {result.repo} "
f"(sandbox={result.sandbox_id}, {result.duration_s:.0f}s)"
),
"details": json.dumps(body),
"event_type": "e2e_result",
}
if workstream_id:
payload["workstream_id"] = workstream_id
try:
req = urllib.request.Request(
f"{STATE_HUB_URL}/progress/",
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
print(f"[reporter] progress event recorded (status={resp.status})")
return True
except urllib.error.URLError as exc:
print(f"[reporter] WARNING: could not reach state-hub: {exc}")
return False

131
e2e-framework/runner.py Normal file
View File

@@ -0,0 +1,131 @@
"""
Full e2e lifecycle: up → health-wait → test → down → result.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from pathlib import Path
from .sandbox import Sandbox
from .schema import E2EConfig
@dataclass
class RunResult:
sandbox_id: str
repo: str
passed: bool
exit_code: int
duration_s: float
output: str
error: str = ""
def run_e2e(
repo_path: Path,
host: str,
ssh_user: str = "root",
ssh_key: str | None = None,
keep: bool = False,
) -> RunResult:
config = E2EConfig.load(repo_path)
sandbox = Sandbox(
host=host,
repo_path=repo_path,
ssh_user=ssh_user,
ssh_key=ssh_key,
)
project_name = f"e2e-{config.name}-{sandbox.sandbox_id}"
compose_path = f"{sandbox.remote_dir}/{config.compose_file}"
started = time.time()
output_lines: list[str] = []
def log(msg: str) -> None:
print(msg)
output_lines.append(msg)
log(f"\n{'='*60}")
log(f"E2E run: {config.name} sandbox={sandbox.sandbox_id}")
log(f"Host: {host} project: {project_name}")
log(f"{'='*60}\n")
try:
# 1. Provision
sandbox.provision()
# 2. docker compose up
env_flags = " ".join(f"-e {k}={v}" for k, v in config.env.items())
up_cmd = (
f"cd {sandbox.remote_dir} && "
f"docker compose -p {project_name} -f {compose_path} up -d"
)
log(f"[runner] docker compose up ({project_name})")
rc, out = sandbox.run(up_cmd, timeout=180, stream=False)
output_lines.append(out)
if rc != 0:
raise RuntimeError(f"docker compose up failed (exit {rc}):\n{out}")
# 3. Health checks
for hc in config.health_checks:
ok = sandbox.wait_for_url(hc.url, timeout=hc.timeout)
if not ok:
raise RuntimeError(f"Health check failed: {hc.name} ({hc.url})")
# 4. Run tests
log(f"\n[runner] running: {config.test_command}")
test_cmd = f"cd {sandbox.remote_dir} && {config.test_command}"
rc, test_out = sandbox.run(test_cmd, timeout=config.timeout, stream=False)
output_lines.append(test_out)
print(test_out)
passed = rc == 0
duration = time.time() - started
log(f"\n[runner] {'PASSED' if passed else 'FAILED'} (exit={rc}, {duration:.1f}s)")
return RunResult(
sandbox_id=sandbox.sandbox_id,
repo=config.name,
passed=passed,
exit_code=rc,
duration_s=duration,
output="\n".join(output_lines),
)
except Exception as exc:
duration = time.time() - started
log(f"\n[runner] ERROR: {exc}")
return RunResult(
sandbox_id=sandbox.sandbox_id,
repo=config.name,
passed=False,
exit_code=-1,
duration_s=duration,
output="\n".join(output_lines),
error=str(exc),
)
finally:
_compose_down(sandbox, project_name, compose_path, config, keep)
def _compose_down(
sandbox: Sandbox,
project_name: str,
compose_path: str,
config: E2EConfig,
keep: bool,
) -> None:
if keep or config.cleanup == "never":
print(f"[runner] skipping cleanup (keep={keep}, cleanup={config.cleanup})")
return
print(f"[runner] docker compose down ({project_name})")
down_cmd = (
f"cd {sandbox.remote_dir} && "
f"docker compose -p {project_name} -f {compose_path} down -v --remove-orphans 2>&1 || true"
)
sandbox.run(down_cmd, timeout=60)
if not keep:
sandbox.teardown()

108
e2e-framework/sandbox.py Normal file
View File

@@ -0,0 +1,108 @@
"""
SSH-based sandbox: provision an isolated directory on the remote host,
rsync the repo into it, and run arbitrary commands there.
"""
from __future__ import annotations
import subprocess
import sys
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class Sandbox:
host: str
repo_path: Path
sandbox_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
ssh_user: str = "root"
ssh_key: str | None = None # path to private key; None = ssh default
base_dir: str = "/tmp/custodian-e2e"
@property
def remote_dir(self) -> str:
return f"{self.base_dir}/{self.sandbox_id}"
@property
def ssh_target(self) -> str:
return f"{self.ssh_user}@{self.host}"
# ── low-level helpers ────────────────────────────────────────────────────
def _ssh_args(self) -> list[str]:
args = ["ssh", "-o", "StrictHostKeyChecking=no",
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=15"]
if self.ssh_key:
args += ["-i", self.ssh_key]
args.append(self.ssh_target)
return args
def run(self, cmd: str, *, timeout: int = 60, stream: bool = False) -> tuple[int, str]:
"""Run a shell command on the remote host. Returns (exit_code, stdout+stderr)."""
full_cmd = self._ssh_args() + [cmd]
if stream:
proc = subprocess.Popen(full_cmd, text=True,
stdout=sys.stdout, stderr=sys.stderr)
proc.wait(timeout=timeout)
return proc.returncode, ""
else:
result = subprocess.run(full_cmd, capture_output=True, text=True, timeout=timeout)
return result.returncode, result.stdout + result.stderr
# ── lifecycle ────────────────────────────────────────────────────────────
def provision(self) -> None:
"""Create the remote sandbox directory and rsync the repo into it."""
print(f"[sandbox] provisioning {self.remote_dir} on {self.host}")
rc, out = self.run(f"mkdir -p {self.remote_dir}")
if rc != 0:
raise RuntimeError(f"Failed to create remote sandbox dir: {out}")
rsync_args = [
"rsync", "-az", "--delete",
"--exclude=.git",
"--exclude=__pycache__",
"--exclude=*.pyc",
"--exclude=.venv",
"--exclude=node_modules",
]
if self.ssh_key:
rsync_args += ["-e", f"ssh -i {self.ssh_key} -o StrictHostKeyChecking=no"]
else:
rsync_args += ["-e", "ssh -o StrictHostKeyChecking=no"]
rsync_args += [
f"{self.repo_path}/",
f"{self.ssh_target}:{self.remote_dir}/",
]
print(f"[sandbox] rsyncing {self.repo_path} → remote:{self.remote_dir}")
result = subprocess.run(rsync_args, capture_output=True, text=True, timeout=120)
if result.returncode != 0:
raise RuntimeError(f"rsync failed: {result.stdout + result.stderr}")
def teardown(self) -> None:
"""Remove the remote sandbox directory."""
print(f"[sandbox] tearing down {self.remote_dir}")
rc, out = self.run(f"rm -rf {self.remote_dir}", timeout=30)
if rc != 0:
print(f"[sandbox] WARNING: teardown failed (manual cleanup may be needed): {out}")
def wait_for_url(self, url: str, timeout: int = 120, interval: int = 5) -> bool:
"""Poll a URL on the remote host until it returns HTTP 2xx or timeout."""
print(f"[sandbox] waiting for {url} (timeout={timeout}s)")
deadline = time.time() + timeout
while time.time() < deadline:
rc, _ = self.run(
f"curl -sf --max-time 5 {url} > /dev/null 2>&1",
timeout=15,
)
if rc == 0:
print(f"[sandbox] {url} is up")
return True
time.sleep(interval)
print(f"[sandbox] TIMEOUT waiting for {url}")
return False

61
e2e-framework/schema.py Normal file
View File

@@ -0,0 +1,61 @@
"""
Parse and validate e2e.yml — the per-repo test contract.
"""
from __future__ import annotations
import yaml
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal
@dataclass
class HealthCheck:
name: str
url: str
timeout: int = 120 # seconds
@dataclass
class E2EConfig:
name: str
compose_file: str # relative to repo root
test_command: str
health_checks: list[HealthCheck] = field(default_factory=list)
timeout: int = 300 # hard limit for test_command
cleanup: Literal["always", "on_success", "never"] = "always"
env: dict[str, str] = field(default_factory=dict)
@classmethod
def load(cls, repo_root: Path) -> "E2EConfig":
config_path = repo_root / "e2e" / "e2e.yml"
if not config_path.exists():
raise FileNotFoundError(f"No e2e.yml found at {config_path}")
raw = yaml.safe_load(config_path.read_text())
health_checks = [
HealthCheck(
name=hc.get("name", hc["url"]),
url=hc["url"],
timeout=int(hc.get("timeout", 120)),
)
for hc in raw.get("health_checks", [])
]
env = {}
for item in raw.get("env", []):
if isinstance(item, dict) and "key" in item:
env[item["key"]] = str(item.get("value", ""))
elif isinstance(item, dict):
env.update(item)
return cls(
name=raw["name"],
compose_file=raw["compose_file"],
test_command=raw["test_command"],
health_checks=health_checks,
timeout=int(raw.get("timeout", 300)),
cleanup=raw.get("cleanup", "always"),
env=env,
)