From a10bbd21625762741d1007f86eff7dd899d820d7 Mon Sep 17 00:00:00 2001 From: tegwick Date: Tue, 30 Jun 2026 15:24:10 +0200 Subject: [PATCH] =?UTF-8?q?feat(WARDEN-WP-0021):=20T3-T5=20=E2=80=94=20vis?= =?UTF-8?q?ibility,=20approve=20loop,=20runbook=20(scheduled=20worker=20co?= =?UTF-8?q?mplete)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit T4 (review→send loop): conservative tick persists structured drafts to state_dir/worker-drafts.json; `warden worker drafts` lists them, `warden worker approve [--body …]` sends the reviewed draft as the reply + marks read + drops it. Escalated plans persist no draft. Live-verified end-to-end. T3 (visibility): `warden worker status` (pending drafts, triage count, last digest, timer state); best-effort notify-send nudge in the tick when drafts are pending. T5: wiki/playbooks/scheduled-worker.md (enable/disable, the approve loop, failure modes, conservative-only posture) + SCOPE note. WARDEN-WP-0021 finished: the conservative worker now runs on a systemd --user timer (enabled, every 15 min), triages new inbox messages into drafts you approve with one command, degrades gracefully, and stops with one command. 249 tests, lint clean. Co-Authored-By: Claude Opus 4.8 --- SCOPE.md | 6 +- scripts/worker-tick.sh | 8 ++ src/warden/cli.py | 39 ++++++++ src/warden/worker.py | 98 +++++++++++++++++++ tests/test_worker.py | 40 ++++++++ wiki/playbooks/scheduled-worker.md | 60 ++++++++++++ ...EN-WP-0021-enable-scheduled-worker-tick.md | 8 +- 7 files changed, 253 insertions(+), 6 deletions(-) create mode 100644 wiki/playbooks/scheduled-worker.md diff --git a/SCOPE.md b/SCOPE.md index 760034c..77e4385 100644 --- a/SCOPE.md +++ b/SCOPE.md @@ -139,8 +139,10 @@ for the rest. - Coordination worker (`warden worker`, WP-0020) — autonomous triage of ops-warden's State Hub inbox via llm-connect. **Conservative by default** (triage + drafted replies, sends nothing); `--full-auto` opt-in. Four guardrails (fixed charter, action allowlist, - no-secret invariant, dry-run/audit) enforced regardless of the brain. Schedulable via - `scripts/worker-tick.sh` (ships disabled) + no-secret invariant, dry-run/audit) enforced regardless of the brain. **Scheduled** + (WP-0021) via a `systemd --user` timer (`scripts/install-worker-timer.sh`); review loop + `warden worker drafts | approve ` + `worker status`; one-command kill switch + (`wiki/playbooks/scheduled-worker.md`) - Runbooks for OpenBao config and Inter-Hub bootstrap SSH envelope ### Stewardship (documentation and alignment) diff --git a/scripts/worker-tick.sh b/scripts/worker-tick.sh index 5d2415c..3b76729 100755 --- a/scripts/worker-tick.sh +++ b/scripts/worker-tick.sh @@ -58,4 +58,12 @@ if ! LLM_CONNECT_URL="$LLM_URL" WARDEN_HUB_URL="$HUB_URL" \ uv run --directory "$ROOT" warden worker run --execute --brain "$BRAIN"; then echo "$(date -Is) tick: worker run returned non-zero; will retry next tick" fi + +# Best-effort desktop nudge when drafts are pending (needs a display; never fails the tick). +if command -v notify-send >/dev/null 2>&1; then + N="$(uv run --directory "$ROOT" warden worker drafts 2>/dev/null | grep -c '→' || true)" + if [[ "${N:-0}" -gt 0 ]]; then + notify-send "ops-warden worker" "$N draft(s) pending — run: warden worker drafts" 2>/dev/null || true + fi +fi exit 0 diff --git a/src/warden/cli.py b/src/warden/cli.py index 277fd70..da78a35 100644 --- a/src/warden/cli.py +++ b/src/warden/cli.py @@ -1212,3 +1212,42 @@ def worker_run( else: console.print("[green]Conservative triage[/green] — drafting; nothing sent to other agents.") console.print(run_conservative(plans, hub, topic_id=topic_id)) + + +@worker_app.command("drafts") +def worker_drafts() -> None: + """List the worker's pending drafted replies (from the conservative tier).""" + from warden.worker import list_drafts + console.print(list_drafts()) + + +@worker_app.command("approve") +def worker_approve( + message_id: Annotated[str, typer.Argument(help="Message id to send the drafted reply for")], + body: Annotated[ + Optional[str], typer.Option("--body", help="Override the drafted reply text before sending") + ] = None, +) -> None: + """Send a reviewed draft as the reply and mark the message read.""" + from warden.worker import HubClient, approve_draft + try: + console.print(approve_draft(message_id, HubClient(), body_override=body)) + except Exception as e: # noqa: BLE001 — surface transport errors cleanly + err.print(f"[red]Approve failed:[/red] {e}") + raise typer.Exit(1) + + +@worker_app.command("status") +def worker_status_cmd() -> None: + """Show worker state: pending drafts, triage count, last digest, timer status.""" + import subprocess + from warden.worker import worker_status + console.print(worker_status()) + try: + st = subprocess.run( + ["systemctl", "--user", "is-active", "ops-warden-worker.timer"], + capture_output=True, text=True, timeout=5, + ).stdout.strip() + console.print(f"timer : {st or 'unknown'}") + except Exception: # noqa: BLE001 — systemd may be absent (cron/other host) + console.print("timer : (systemd not available)") diff --git a/src/warden/worker.py b/src/warden/worker.py index 1fe53ef..c6b7f54 100644 --- a/src/warden/worker.py +++ b/src/warden/worker.py @@ -364,6 +364,92 @@ def save_seen(state_dir: Path, seen: set) -> None: (state_dir / "worker-seen.json").write_text(_json.dumps(sorted(seen))) +def _re_subject(subject: str) -> str: + return subject if subject.lower().startswith("re:") else f"Re: {subject}" + + +def _draftable_body(plan: WorkerPlan) -> Optional[str]: + """The reply text a plan would send, if any (route_answer or reply with a body).""" + for a in plan.actions: + if a.risk != "safe": + continue + if a.kind == "route_answer" and a.payload.get("answer"): + return a.payload["answer"] + if a.kind == "reply" and a.payload.get("body"): + return a.payload["body"] + return None + + +def load_drafts(state_dir: Path) -> dict: + import json as _json + + p = state_dir / "worker-drafts.json" + if not p.exists(): + return {} + try: + d = _json.loads(p.read_text()) + return d if isinstance(d, dict) else {} + except (ValueError, OSError): + return {} + + +def save_drafts(state_dir: Path, drafts: dict) -> None: + import json as _json + + (state_dir / "worker-drafts.json").write_text(_json.dumps(drafts, indent=2)) + + +def list_drafts(state_dir: Optional[Path] = None) -> str: + drafts = load_drafts(state_dir or default_state_dir()) + if not drafts: + return "no pending drafts." + lines: List[str] = [] + for mid, d in drafts.items(): + lines.append(f"{mid} → {d.get('to_agent')}: {d.get('subject')}") + body = (d.get("body") or "").replace("\n", " ") + lines.append(f" {body[:140]}{'…' if len(body) > 140 else ''}") + return "\n".join(lines) + + +def approve_draft( + message_id: str, hub: HubClient, *, state_dir: Optional[Path] = None, + body_override: Optional[str] = None, +) -> str: + """Send a reviewed draft as the reply + mark the message read, then drop the draft.""" + state_dir = state_dir or default_state_dir() + drafts = load_drafts(state_dir) + d = drafts.get(message_id) + if not d: + return f"no pending draft for {message_id} (try `warden worker drafts`)." + hub.send_reply( + to_agent=d["to_agent"], subject=d["subject"], + body=body_override if body_override is not None else d["body"], + thread_id=d.get("thread_id"), + ) + hub.mark_read(message_id) + drafts.pop(message_id, None) + save_drafts(state_dir, drafts) + return f"sent reply to {d['to_agent']} ({d['subject']}) and marked read." + + +def worker_status(state_dir: Optional[Path] = None) -> str: + """Operator-facing state of the worker: drafts, triage count, digest location.""" + import datetime as _dt + + state_dir = state_dir or default_state_dir() + drafts = load_drafts(state_dir) + seen = load_seen(state_dir) + digest = state_dir / "worker-digest.md" + when = "—" + if digest.exists(): + when = _dt.datetime.fromtimestamp(digest.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S") + return "\n".join([ + f"pending drafts : {len(drafts)} (warden worker drafts | approve )", + f"triaged (seen) : {len(seen)}", + f"last digest : {when} {digest}", + ]) + + def build_digest(plans: List[WorkerPlan]) -> str: """Human-reviewable digest of proposed actions + drafted replies. Sends nothing.""" if not plans: @@ -403,6 +489,18 @@ def run_conservative( new = [p for p in plans if p.message_id and p.message_id not in seen] digest = build_digest(new) (state_dir / "worker-digest.md").write_text(digest + "\n") + # Persist structured drafts so `warden worker approve` can send a reviewed one. + drafts = load_drafts(state_dir) + for p in new: + if p.escalated: + continue + body = _draftable_body(p) + if body: + drafts[p.message_id] = { + "to_agent": p.from_agent, "subject": _re_subject(p.subject), + "body": body, "thread_id": p.raw.get("thread_id") or p.message_id, + } + save_drafts(state_dir, drafts) if new: n_esc = sum(1 for p in new if p.escalated) try: diff --git a/tests/test_worker.py b/tests/test_worker.py index 5985f6f..9032842 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -206,6 +206,46 @@ def test_run_conservative_drafts_no_sends_and_dedups(tmp_path): assert not any(c[0] == "progress" for c in hub2.calls) +# --- approve loop (WP-0021 T4) ------------------------------------------------ + +def test_conservative_persists_draft_and_approve_sends(tmp_path): + from warden.worker import approve_draft, list_drafts, load_drafts + hub = _FakeHub() + p = _plan([PlannedAction(kind="route_answer", summary="a", payload={"answer": "the answer"})]) + run_conservative([p], hub, state_dir=tmp_path) + drafts = load_drafts(tmp_path) + assert "m1" in drafts and drafts["m1"]["body"] == "the answer" + assert "m1" in list_drafts(tmp_path) + # approve → sends the reply + marks read + drops the draft + hub2 = _FakeHub() + out = approve_draft("m1", hub2, state_dir=tmp_path) + assert any(c[0] == "reply" and c[3] == "the answer" for c in hub2.calls) + assert any(c[0] == "mark_read" for c in hub2.calls) + assert "m1" not in load_drafts(tmp_path) + assert "sent reply" in out + + +def test_approve_body_override(tmp_path): + from warden.worker import approve_draft, save_drafts + save_drafts(tmp_path, {"m9": {"to_agent": "bob", "subject": "Re: x", "body": "orig", "thread_id": "t"}}) + hub = _FakeHub() + approve_draft("m9", hub, state_dir=tmp_path, body_override="edited") + assert any(c[0] == "reply" and c[3] == "edited" for c in hub.calls) + + +def test_approve_missing_draft(tmp_path): + from warden.worker import approve_draft + out = approve_draft("nope", _FakeHub(), state_dir=tmp_path) + assert "no pending draft" in out + + +def test_escalated_plan_persists_no_draft(tmp_path): + a = PlannedAction(kind="reply", summary="x", risk="escalate", reason="secret") + run_conservative([_plan([a])], _FakeHub(), state_dir=tmp_path) + from warden.worker import load_drafts + assert load_drafts(tmp_path) == {} + + # --- executor (T3) ----------------------------------------------------------- class _FakeHub: diff --git a/wiki/playbooks/scheduled-worker.md b/wiki/playbooks/scheduled-worker.md new file mode 100644 index 0000000..12b50e6 --- /dev/null +++ b/wiki/playbooks/scheduled-worker.md @@ -0,0 +1,60 @@ +# Scheduled coordination worker + +Date: 2026-06-30 · Workplan: WARDEN-WP-0021 · Code: WARDEN-WP-0020 + +The ops-warden worker triages its State Hub inbox on a schedule and drafts replies you +approve. **Conservative tier only** — it never auto-sends to other agents and never marks a +message read on its own (build-stage decision `813899f9`). The four guardrails (fixed +charter, action allowlist, no-secret invariant, dry-run/audit) hold every run. + +## Enable / disable + +```bash +./scripts/install-worker-timer.sh --enable # install + start (systemd --user, every 15 min) +systemctl --user disable --now ops-warden-worker.timer # kill switch +# or, leave the timer but pause every run: +echo 'WORKER_ENABLED=0' >> ~/.config/warden/worker.env +``` +No systemd? Cron fallback: +``` +*/15 * * * * /home/worsch/ops-warden/scripts/worker-tick.sh >> ~/.local/state/warden/worker-tick.log 2>&1 +``` + +## The loop + +```bash +warden worker status # pending drafts, last run, timer state +warden worker drafts # list drafted replies awaiting your OK +warden worker approve # send a draft as your reply + mark read +warden worker approve --body "…" # edit before sending +``` +Each tick writes `~/.local/state/warden/worker-digest.md` and posts one progress note; a +desktop `notify-send` fires when drafts are pending (if a display is present). + +## Config (`~/.config/warden/worker.env`) + +| Var | Meaning | +| --- | --- | +| `WARDEN_HUB_URL` | State Hub (default `http://127.0.0.1:8000`; railiance01 after cust-wp-0011) | +| `WORKER_BRAIN` | `llm` (llm-connect) or `rule` (offline fallback) | +| `WORKER_ENABLED` | `0` pauses every tick without touching the timer | +| `LLM_CONNECT_URL` | set to skip the per-tick kubectl port-forward to llm-connect | + +## Failure modes (all graceful) + +- **State Hub unreachable** → the tick `/state/health`-prechecks and skips cleanly (exit 0). +- **llm-connect unreachable** → falls back to the deterministic rule brain (dumber, still triages). +- **Overlapping runs** → `flock` guard; the later run skips. +- A worker-run hiccup is logged but never fails the unit — the next tick retries. + +## Posture + +Conservative is the only scheduled mode. `--full-auto` (auto-send) exists but is **not** +scheduled — it broadcasts the LLM's occasionally-wrong content unattended, which the +guardrails can't prevent (they stop *security* harm, not *content* error). Revisit when the +ecosystem reaches testing. + +## See also + +- `WARDEN-WP-0020` (the worker), `scripts/worker-tick.sh`, `scripts/install-worker-timer.sh` +- build-stage decision `813899f9` diff --git a/workplans/WARDEN-WP-0021-enable-scheduled-worker-tick.md b/workplans/WARDEN-WP-0021-enable-scheduled-worker-tick.md index 4ee8afe..2500d4b 100644 --- a/workplans/WARDEN-WP-0021-enable-scheduled-worker-tick.md +++ b/workplans/WARDEN-WP-0021-enable-scheduled-worker-tick.md @@ -4,7 +4,7 @@ type: workplan title: "Enable the scheduled worker tick — conservative inbox triage, unattended" domain: infotech repo: ops-warden -status: active +status: finished owner: claude topic_slug: custodian planning_priority: high @@ -88,7 +88,7 @@ state_hub_task_id: "1f35f816-1af5-46ff-b48c-1715f3ae5784" ```task id: WARDEN-WP-0021-T03 -status: todo +status: done priority: medium state_hub_task_id: "3c7f6423-8db0-4bc6-b67d-078d9d929c6d" ``` @@ -101,7 +101,7 @@ state_hub_task_id: "3c7f6423-8db0-4bc6-b67d-078d9d929c6d" ```task id: WARDEN-WP-0021-T04 -status: todo +status: done priority: high state_hub_task_id: "dabc9fc0-abb1-4e9d-b87e-5f0c5950693c" ``` @@ -116,7 +116,7 @@ state_hub_task_id: "dabc9fc0-abb1-4e9d-b87e-5f0c5950693c" ```task id: WARDEN-WP-0021-T05 -status: todo +status: done priority: medium state_hub_task_id: "9915da96-1b33-4d0f-b752-408ea8d43333" ```