diff --git a/src/warden/cli.py b/src/warden/cli.py index 9de31a1..277fd70 100644 --- a/src/warden/cli.py +++ b/src/warden/cli.py @@ -1164,15 +1164,21 @@ def worker_run( str, typer.Option("--brain", help="Planner: 'rule' (deterministic, default) or 'llm' (llm-connect)"), ] = "rule", + full_auto: Annotated[ + bool, + typer.Option("--full-auto", help="With --execute: auto-send replies + mark-read (default is conservative: triage + drafts only)"), + ] = False, ) -> None: - """Read ops-warden's unread coordination requests and render a guardrailed plan. + """Read ops-warden's unread coordination requests and act on them, guardrailed. - Plans with the deterministic RuleBrain (default) or the llm-connect brain (--brain llm). - Either way the allowlist + no-secret guardrails are enforced on every action. --execute - is rejected until the guarded executor (T3) ships; dry-run is the default. + Default `--dry-run` previews. `--execute` runs the **conservative** tier: triage new + messages into a reviewed digest with drafted replies, post one progress note, and send + NOTHING to other agents (safe to schedule). `--execute --full-auto` auto-sends the safe + allowlisted actions. The allowlist + no-secret guardrails hold in every mode. """ from warden.worker import ( HubClient, LlmConnectBrain, RuleBrain, build_plans, execute_plans, render_plans, + run_conservative, ) if brain not in ("rule", "llm"): @@ -1198,7 +1204,11 @@ def worker_run( ) return - # --execute: run the guarded executor. Topic for audit progress events. + # --execute. Topic for audit progress events. topic_id = "cee7bedf-2b48-46ef-8601-006474f2ad7a" - console.print("[yellow]Executing (full-auto, in-scope only; escalations left for a human)…[/yellow]") - console.print(execute_plans(plans, hub, topic_id=topic_id)) + if full_auto: + console.print("[yellow]Executing FULL-AUTO (in-scope only; escalations left for a human)…[/yellow]") + console.print(execute_plans(plans, hub, topic_id=topic_id)) + else: + console.print("[green]Conservative triage[/green] — drafting; nothing sent to other agents.") + console.print(run_conservative(plans, hub, topic_id=topic_id)) diff --git a/src/warden/worker.py b/src/warden/worker.py index 19112f5..1fe53ef 100644 --- a/src/warden/worker.py +++ b/src/warden/worker.py @@ -16,6 +16,7 @@ from __future__ import annotations import os import re from dataclasses import dataclass, field +from pathlib import Path from typing import List, Optional, Protocol import httpx @@ -329,7 +330,7 @@ def execute_plan(plan: WorkerPlan, hub: HubClient, *, topic_id: Optional[str] = def execute_plans(plans: List[WorkerPlan], hub: HubClient, *, topic_id: Optional[str] = None) -> str: - """Execute every plan and return a human-readable audit summary.""" + """FULL-AUTO: execute every plan's safe actions and return an audit summary.""" lines: List[str] = [] for p in plans: results = execute_plan(p, hub, topic_id=topic_id) @@ -339,6 +340,85 @@ def execute_plans(plans: List[WorkerPlan], hub: HubClient, *, topic_id: Optional return "\n".join(lines) if lines else "inbox empty — nothing to execute." +# --- conservative tier (default for --execute): triage + draft, never auto-send ---------- + +def default_state_dir() -> Path: + return Path(os.environ.get("WARDEN_STATE_DIR", str(Path.home() / ".local" / "state" / "warden"))) + + +def load_seen(state_dir: Path) -> set: + import json as _json + + p = state_dir / "worker-seen.json" + if not p.exists(): + return set() + try: + return set(_json.loads(p.read_text())) + except (ValueError, OSError): + return set() + + +def save_seen(state_dir: Path, seen: set) -> None: + import json as _json + + (state_dir / "worker-seen.json").write_text(_json.dumps(sorted(seen))) + + +def build_digest(plans: List[WorkerPlan]) -> str: + """Human-reviewable digest of proposed actions + drafted replies. Sends nothing.""" + if not plans: + return "No new coordination requests." + lines: List[str] = [] + for p in plans: + tag = "NEEDS YOU" if p.escalated else "DRAFT READY" + lines.append(f"## [{tag}] {p.from_agent}: {p.subject} ({p.message_id})") + if not p.actions: + lines.append("- no in-scope action — handle directly") + for a in p.actions: + if a.risk == "escalate": + lines.append(f"- escalated ({a.reason}): {a.summary}") + elif a.kind == "route_answer" and a.payload.get("answer"): + lines.append(f"- proposed answer: {a.payload['answer']}") + elif a.kind == "reply" and a.payload.get("body"): + lines.append(f"- proposed reply: {a.payload['body']}") + else: + lines.append(f"- {a.kind}: {a.summary}") + lines.append("") + return "\n".join(lines).rstrip() + + +def run_conservative( + plans: List[WorkerPlan], hub: HubClient, *, topic_id: Optional[str] = None, + state_dir: Optional[Path] = None, +) -> str: + """Triage NEW messages into a reviewed digest. No agent-facing sends, no mark-read. + + Safe to schedule: it only surfaces what's waiting (with drafted replies for you to + approve), tracks which messages it has already digested, and posts one progress note + so a scheduled run is visible. The operator approves/sends the good drafts. + """ + state_dir = state_dir or default_state_dir() + state_dir.mkdir(parents=True, exist_ok=True) + seen = load_seen(state_dir) + new = [p for p in plans if p.message_id and p.message_id not in seen] + digest = build_digest(new) + (state_dir / "worker-digest.md").write_text(digest + "\n") + if new: + n_esc = sum(1 for p in new if p.escalated) + try: + hub.add_progress( + summary=( + f"[worker] triaged {len(new)} new message(s): {len(new) - n_esc} with " + f"drafted replies, {n_esc} need you. Drafts: {state_dir / 'worker-digest.md'}" + ), + topic_id=topic_id, + ) + except Exception: # noqa: BLE001 — a note failure must not lose the digest + pass + save_seen(state_dir, seen | {p.message_id for p in new}) + return digest + + def draft_route_answer(query: str) -> str: """Compute the routing answer the worker would send for a query. Read-only. diff --git a/tests/test_worker.py b/tests/test_worker.py index 9ddf24a..5985f6f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -10,8 +10,10 @@ from warden.worker import ( RuleBrain, WorkerPlan, _extract_json, + build_digest, build_plans, render_plans, + run_conservative, validate_action, ) @@ -171,13 +173,39 @@ def test_cli_worker_dry_run(monkeypatch): assert "nothing executed" in r.stdout -def test_cli_worker_execute_runs(monkeypatch): - # --execute now runs the guarded executor; empty inbox → clean exit. +def test_cli_worker_execute_runs(monkeypatch, tmp_path): + # --execute runs the conservative tier; empty inbox → clean exit. + monkeypatch.setenv("WARDEN_STATE_DIR", str(tmp_path)) monkeypatch.setattr("warden.worker.HubClient.unread", lambda self, to_agent="ops-warden": []) r = runner.invoke(app, ["worker", "run", "--execute"]) assert r.exit_code == 0 +# --- conservative tier (Option A) -------------------------------------------- + +def test_build_digest_shows_drafts_and_escalations(): + p1 = _plan([PlannedAction(kind="reply", summary="ack", payload={"body": "hello there"})]) + p2 = _plan([PlannedAction(kind="reply", summary="x", risk="escalate", reason="secret")], + message_id="m2") + out = build_digest([p1, p2]) + assert "DRAFT READY" in out and "NEEDS YOU" in out and "hello there" in out + + +def test_run_conservative_drafts_no_sends_and_dedups(tmp_path): + hub = _FakeHub() + p = _plan([PlannedAction(kind="route_answer", summary="a", payload={"answer": "the answer"})]) + run_conservative([p], hub, topic_id="t", state_dir=tmp_path) + # never sends to other agents or marks read — only a single progress note + assert not any(c[0] in ("reply", "mark_read") for c in hub.calls) + assert any(c[0] == "progress" for c in hub.calls) + digest = (tmp_path / "worker-digest.md").read_text() + assert "the answer" in digest + # second run: message already seen → no new progress note (schedule-safe dedup) + hub2 = _FakeHub() + run_conservative([p], hub2, topic_id="t", state_dir=tmp_path) + assert not any(c[0] == "progress" for c in hub2.calls) + + # --- executor (T3) ----------------------------------------------------------- class _FakeHub: diff --git a/workplans/WARDEN-WP-0020-ops-warden-worker.md b/workplans/WARDEN-WP-0020-ops-warden-worker.md index 50370c6..94a0c9e 100644 --- a/workplans/WARDEN-WP-0020-ops-warden-worker.md +++ b/workplans/WARDEN-WP-0020-ops-warden-worker.md @@ -120,8 +120,15 @@ state_hub_task_id: "3a71965e-42d5-4258-9761-aced804c88e7" per-message audit summary. Tests in `tests/test_worker.py` (route_answer reply+mark, reply-with/without-body, escalated skip, catalog-diff left-for-human, progress_note, failure-without-crash). 243 pass, lint clean. -- Note: first **live** `--execute` shakedown is the operator's (staged rollout: dry-run → - manual → scheduled); T4 wraps it on a schedule. +- [x] **Conservative tier is now the `--execute` default (Bernd's Option A, 2026-06-30):** + `run_conservative` triages NEW messages into a reviewed digest (`worker-digest.md`) + with drafted replies, posts ONE progress note, tracks seen ids (schedule-safe dedup), + and sends **nothing** to other agents / marks nothing read. `--full-auto` opts into the + auto-send path. Live-verified with the LLM brain: produced a high-quality draft reply + to secrets-engine and flagged the llm-connect request as NEEDS YOU. 244 tests. + Rationale: the guardrails prevent *security* harm but not LLM *content* errors, so replies + stay drafts-for-approval until quality is proven — matches the build-stage/recoverability + posture. Conservative mode is safe to schedule (T4). ### T4 — Scheduled trigger