feat(WARDEN-WP-0020): conservative triage tier as the --execute default (Option A)

Per Bernd's call: the guardrails prevent security harm but not LLM content errors, so the
worker should triage + draft, not auto-send, until reply quality is proven (matches the
build-stage/recoverability posture).

run_conservative triages NEW messages into a reviewed digest (state_dir/worker-digest.md)
with drafted replies, posts ONE progress note, tracks seen message ids (schedule-safe
dedup), and sends NOTHING to other agents / marks nothing read. `warden worker run
--execute` now runs this conservative tier; `--full-auto` opts into the auto-send path.

Live-verified with the LLM brain on the real inbox: produced a high-quality draft reply to
a secrets-engine coordination message and correctly flagged the llm-connect custody request
as NEEDS YOU. Conservative mode is safe to schedule (T4). 244 tests, lint clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-30 00:38:36 +02:00
parent a55b3b7735
commit d0261ebb52
4 changed files with 137 additions and 12 deletions

View File

@@ -1164,15 +1164,21 @@ def worker_run(
str,
typer.Option("--brain", help="Planner: 'rule' (deterministic, default) or 'llm' (llm-connect)"),
] = "rule",
full_auto: Annotated[
bool,
typer.Option("--full-auto", help="With --execute: auto-send replies + mark-read (default is conservative: triage + drafts only)"),
] = False,
) -> None:
"""Read ops-warden's unread coordination requests and render a guardrailed plan.
"""Read ops-warden's unread coordination requests and act on them, guardrailed.
Plans with the deterministic RuleBrain (default) or the llm-connect brain (--brain llm).
Either way the allowlist + no-secret guardrails are enforced on every action. --execute
is rejected until the guarded executor (T3) ships; dry-run is the default.
Default `--dry-run` previews. `--execute` runs the **conservative** tier: triage new
messages into a reviewed digest with drafted replies, post one progress note, and send
NOTHING to other agents (safe to schedule). `--execute --full-auto` auto-sends the safe
allowlisted actions. The allowlist + no-secret guardrails hold in every mode.
"""
from warden.worker import (
HubClient, LlmConnectBrain, RuleBrain, build_plans, execute_plans, render_plans,
run_conservative,
)
if brain not in ("rule", "llm"):
@@ -1198,7 +1204,11 @@ def worker_run(
)
return
# --execute: run the guarded executor. Topic for audit progress events.
# --execute. Topic for audit progress events.
topic_id = "cee7bedf-2b48-46ef-8601-006474f2ad7a"
console.print("[yellow]Executing (full-auto, in-scope only; escalations left for a human)…[/yellow]")
console.print(execute_plans(plans, hub, topic_id=topic_id))
if full_auto:
console.print("[yellow]Executing FULL-AUTO (in-scope only; escalations left for a human)…[/yellow]")
console.print(execute_plans(plans, hub, topic_id=topic_id))
else:
console.print("[green]Conservative triage[/green] — drafting; nothing sent to other agents.")
console.print(run_conservative(plans, hub, topic_id=topic_id))

View File

@@ -16,6 +16,7 @@ from __future__ import annotations
import os
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional, Protocol
import httpx
@@ -329,7 +330,7 @@ def execute_plan(plan: WorkerPlan, hub: HubClient, *, topic_id: Optional[str] =
def execute_plans(plans: List[WorkerPlan], hub: HubClient, *, topic_id: Optional[str] = None) -> str:
"""Execute every plan and return a human-readable audit summary."""
"""FULL-AUTO: execute every plan's safe actions and return an audit summary."""
lines: List[str] = []
for p in plans:
results = execute_plan(p, hub, topic_id=topic_id)
@@ -339,6 +340,85 @@ def execute_plans(plans: List[WorkerPlan], hub: HubClient, *, topic_id: Optional
return "\n".join(lines) if lines else "inbox empty — nothing to execute."
# --- conservative tier (default for --execute): triage + draft, never auto-send ----------
def default_state_dir() -> Path:
return Path(os.environ.get("WARDEN_STATE_DIR", str(Path.home() / ".local" / "state" / "warden")))
def load_seen(state_dir: Path) -> set:
import json as _json
p = state_dir / "worker-seen.json"
if not p.exists():
return set()
try:
return set(_json.loads(p.read_text()))
except (ValueError, OSError):
return set()
def save_seen(state_dir: Path, seen: set) -> None:
import json as _json
(state_dir / "worker-seen.json").write_text(_json.dumps(sorted(seen)))
def build_digest(plans: List[WorkerPlan]) -> str:
"""Human-reviewable digest of proposed actions + drafted replies. Sends nothing."""
if not plans:
return "No new coordination requests."
lines: List[str] = []
for p in plans:
tag = "NEEDS YOU" if p.escalated else "DRAFT READY"
lines.append(f"## [{tag}] {p.from_agent}: {p.subject} ({p.message_id})")
if not p.actions:
lines.append("- no in-scope action — handle directly")
for a in p.actions:
if a.risk == "escalate":
lines.append(f"- escalated ({a.reason}): {a.summary}")
elif a.kind == "route_answer" and a.payload.get("answer"):
lines.append(f"- proposed answer: {a.payload['answer']}")
elif a.kind == "reply" and a.payload.get("body"):
lines.append(f"- proposed reply: {a.payload['body']}")
else:
lines.append(f"- {a.kind}: {a.summary}")
lines.append("")
return "\n".join(lines).rstrip()
def run_conservative(
plans: List[WorkerPlan], hub: HubClient, *, topic_id: Optional[str] = None,
state_dir: Optional[Path] = None,
) -> str:
"""Triage NEW messages into a reviewed digest. No agent-facing sends, no mark-read.
Safe to schedule: it only surfaces what's waiting (with drafted replies for you to
approve), tracks which messages it has already digested, and posts one progress note
so a scheduled run is visible. The operator approves/sends the good drafts.
"""
state_dir = state_dir or default_state_dir()
state_dir.mkdir(parents=True, exist_ok=True)
seen = load_seen(state_dir)
new = [p for p in plans if p.message_id and p.message_id not in seen]
digest = build_digest(new)
(state_dir / "worker-digest.md").write_text(digest + "\n")
if new:
n_esc = sum(1 for p in new if p.escalated)
try:
hub.add_progress(
summary=(
f"[worker] triaged {len(new)} new message(s): {len(new) - n_esc} with "
f"drafted replies, {n_esc} need you. Drafts: {state_dir / 'worker-digest.md'}"
),
topic_id=topic_id,
)
except Exception: # noqa: BLE001 — a note failure must not lose the digest
pass
save_seen(state_dir, seen | {p.message_id for p in new})
return digest
def draft_route_answer(query: str) -> str:
"""Compute the routing answer the worker would send for a query. Read-only.

View File

@@ -10,8 +10,10 @@ from warden.worker import (
RuleBrain,
WorkerPlan,
_extract_json,
build_digest,
build_plans,
render_plans,
run_conservative,
validate_action,
)
@@ -171,13 +173,39 @@ def test_cli_worker_dry_run(monkeypatch):
assert "nothing executed" in r.stdout
def test_cli_worker_execute_runs(monkeypatch):
# --execute now runs the guarded executor; empty inbox → clean exit.
def test_cli_worker_execute_runs(monkeypatch, tmp_path):
# --execute runs the conservative tier; empty inbox → clean exit.
monkeypatch.setenv("WARDEN_STATE_DIR", str(tmp_path))
monkeypatch.setattr("warden.worker.HubClient.unread", lambda self, to_agent="ops-warden": [])
r = runner.invoke(app, ["worker", "run", "--execute"])
assert r.exit_code == 0
# --- conservative tier (Option A) --------------------------------------------
def test_build_digest_shows_drafts_and_escalations():
p1 = _plan([PlannedAction(kind="reply", summary="ack", payload={"body": "hello there"})])
p2 = _plan([PlannedAction(kind="reply", summary="x", risk="escalate", reason="secret")],
message_id="m2")
out = build_digest([p1, p2])
assert "DRAFT READY" in out and "NEEDS YOU" in out and "hello there" in out
def test_run_conservative_drafts_no_sends_and_dedups(tmp_path):
hub = _FakeHub()
p = _plan([PlannedAction(kind="route_answer", summary="a", payload={"answer": "the answer"})])
run_conservative([p], hub, topic_id="t", state_dir=tmp_path)
# never sends to other agents or marks read — only a single progress note
assert not any(c[0] in ("reply", "mark_read") for c in hub.calls)
assert any(c[0] == "progress" for c in hub.calls)
digest = (tmp_path / "worker-digest.md").read_text()
assert "the answer" in digest
# second run: message already seen → no new progress note (schedule-safe dedup)
hub2 = _FakeHub()
run_conservative([p], hub2, topic_id="t", state_dir=tmp_path)
assert not any(c[0] == "progress" for c in hub2.calls)
# --- executor (T3) -----------------------------------------------------------
class _FakeHub:

View File

@@ -120,8 +120,15 @@ state_hub_task_id: "3a71965e-42d5-4258-9761-aced804c88e7"
per-message audit summary. Tests in `tests/test_worker.py` (route_answer reply+mark,
reply-with/without-body, escalated skip, catalog-diff left-for-human, progress_note,
failure-without-crash). 243 pass, lint clean.
- Note: first **live** `--execute` shakedown is the operator's (staged rollout: dry-run →
manual → scheduled); T4 wraps it on a schedule.
- [x] **Conservative tier is now the `--execute` default (Bernd's Option A, 2026-06-30):**
`run_conservative` triages NEW messages into a reviewed digest (`worker-digest.md`)
with drafted replies, posts ONE progress note, tracks seen ids (schedule-safe dedup),
and sends **nothing** to other agents / marks nothing read. `--full-auto` opts into the
auto-send path. Live-verified with the LLM brain: produced a high-quality draft reply
to secrets-engine and flagged the llm-connect request as NEEDS YOU. 244 tests.
Rationale: the guardrails prevent *security* harm but not LLM *content* errors, so replies
stay drafts-for-approval until quality is proven — matches the build-stage/recoverability
posture. Conservative mode is safe to schedule (T4).
### T4 — Scheduled trigger