diff --git a/src/warden/cli.py b/src/warden/cli.py index de0a59f..0b30332 100644 --- a/src/warden/cli.py +++ b/src/warden/cli.py @@ -34,6 +34,12 @@ policy_app = typer.Typer( ) app.add_typer(policy_app, name="policy") +worker_app = typer.Typer( + help="Autonomous coordination worker (WP-0020; dry-run only until executor lands)", + no_args_is_help=True, +) +app.add_typer(worker_app, name="worker") + console = Console() err = Console(stderr=True) @@ -1141,3 +1147,45 @@ def policy_show( floor = [dc for dc, lvl in cat.dataclass_floor.items() if lvl == mat.id] if floor: console.print(f" {'dataclass floor':14}: {', '.join(floor)} require this level") + + +# --------------------------------------------------------------------------- +# warden worker — autonomous coordination worker (WP-0020 T1: dry-run scaffold) +# --------------------------------------------------------------------------- + +@worker_app.command("run") +def worker_run( + once: Annotated[bool, typer.Option("--once", help="Process the inbox once and exit")] = True, + dry_run: Annotated[ + bool, + typer.Option("--dry-run/--execute", help="Plan only (default); --execute lands in WP-0020 T3"), + ] = True, +) -> None: + """Read ops-warden's unread coordination requests and render a guardrailed plan. + + T1 is dry-run only: it plans with the deterministic RuleBrain and applies the + allowlist + no-secret guardrails. The llm-connect brain (T2) and executor (T3) plug + into the same plan contract; --execute is rejected until T3 ships. + """ + from warden.worker import HubClient, RuleBrain, build_plans, render_plans + + if not dry_run: + err.print( + "[red]--execute is not available yet[/red] (WP-0020 T3). " + "The worker runs dry-run only until the guarded executor lands." + ) + raise typer.Exit(2) + + try: + messages = HubClient().unread() + except Exception as e: # noqa: BLE001 — surface any transport error as a clean message + err.print(f"[red]Could not read the State Hub inbox:[/red] {e}") + raise typer.Exit(1) + + plans = build_plans(messages, RuleBrain()) + console.print(render_plans(plans)) + auto = sum(1 for p in plans if not p.escalated) + console.print( + f"\n[dim]{len(plans)} request(s): {auto} auto-actionable, " + f"{len(plans) - auto} need a human. (dry-run — nothing executed)[/dim]" + ) diff --git a/src/warden/worker.py b/src/warden/worker.py new file mode 100644 index 0000000..091b7c5 --- /dev/null +++ b/src/warden/worker.py @@ -0,0 +1,172 @@ +"""ops-warden coordination worker (WARDEN-WP-0020). + +Pulls ops-warden's unread State Hub coordination requests and turns each into a +**plan** of ops-warden actions. This module is the llm-connect-independent foundation +(T1): the inbox client, the plan model, the deterministic ``RuleBrain`` default, the +guardrail allowlist, and the dry-run renderer. The llm-connect brain (T2) and the +executing dispatcher (T3) plug into the same ``Brain`` protocol and ``WorkerPlan``. + +Guardrails live here, not in the brain — the allowlist and no-secret invariant are +enforced on every action *regardless* of what the brain proposes, so an LLM (or a +prompt-injected message) cannot widen ops-warden's authority. Dry-run is the default; +nothing executes in T1. +""" +from __future__ import annotations + +import os +import re +from dataclasses import dataclass, field +from typing import List, Optional, Protocol + +import httpx + +DEFAULT_HUB_URL = "http://127.0.0.1:8000" +WORKER_AGENT = "ops-warden" + +# Actions the worker may take autonomously. Anything else escalates to a human. +ALLOWED_ACTION_KINDS = frozenset( + {"route_answer", "reply", "mark_read", "propose_catalog_diff", "progress_note"} +) + +# Signals that a task would breach the conduit-not-broker boundary (handle a secret +# value) or touch production config / irreversible state — always escalate, never auto. +_SECRET_SIGNS = re.compile( + r"\b(token value|secret value|raw token|api[_ ]?key|password|private key|" + r"vault[_ ]?token|npm_auth_token|client[_ ]?secret|credential value)\b", + re.IGNORECASE, +) +_PROD_SIGNS = re.compile( + r"\b(policy\.enabled|prod flip|production config|enable the gate|" + r"~/\.config/warden/warden\.yaml|deploy to prod)\b", + re.IGNORECASE, +) +# A routing/credential question the worker can answer read-only. +_ROUTING_SIGNS = re.compile( + r"\b(where|which subsystem|how do i (get|obtain)|route|who owns|" + r"credential|warden route|warden access)\b", + re.IGNORECASE, +) + + +@dataclass +class PlannedAction: + kind: str + summary: str + payload: dict = field(default_factory=dict) + # filled by the guardrail pass: "safe" or "escalate" (+ reason when escalated) + risk: str = "safe" + reason: str = "" + + +@dataclass +class WorkerPlan: + message_id: str + from_agent: str + subject: str + actions: List[PlannedAction] = field(default_factory=list) + + @property + def escalated(self) -> bool: + return any(a.risk == "escalate" for a in self.actions) or not self.actions + + +class Brain(Protocol): + """Turns one inbox message into a proposed WorkerPlan. Pure: no side effects.""" + + def plan(self, message: dict) -> WorkerPlan: ... + + +def validate_action(action: PlannedAction, message: dict) -> Optional[str]: + """Return a rejection reason if the action must escalate, else None. + + Defense-in-depth: enforced on every action regardless of what the brain proposed. + """ + if action.kind not in ALLOWED_ACTION_KINDS: + return f"action kind {action.kind!r} is not on the allowlist" + blob = f"{message.get('subject', '')} {message.get('body', '')} {action.summary}" + if action.kind in ("reply", "route_answer", "progress_note", "propose_catalog_diff"): + # These are fine in general, but never when the task is about a secret *value* + # or a production-config change — those need a human. + if _SECRET_SIGNS.search(blob): + return "task involves a secret value (conduit-not-broker — never auto-handled)" + if _PROD_SIGNS.search(blob): + return "task touches production config (requires explicit human approval)" + return None + + +def _guardrail(plan: WorkerPlan, message: dict) -> WorkerPlan: + """Downgrade any action that fails validation to an escalation. Brain-agnostic.""" + for a in plan.actions: + reason = validate_action(a, message) + if reason: + a.risk = "escalate" + a.reason = reason + return plan + + +class RuleBrain: + """Deterministic, no-LLM brain for the scaffold + tests. + + Conservative by design: it only proposes a read-only routing answer for clear + routing questions, and escalates everything else to a human. The llm-connect brain + (T2) replaces this with real reasoning over the same WorkerPlan contract. + """ + + def plan(self, message: dict) -> WorkerPlan: + wp = WorkerPlan( + message_id=str(message.get("id", "")), + from_agent=str(message.get("from_agent", "")), + subject=str(message.get("subject", "")), + ) + blob = f"{message.get('subject', '')} {message.get('body', '')}" + if _SECRET_SIGNS.search(blob) or _PROD_SIGNS.search(blob): + return wp # no actions → escalates + if _ROUTING_SIGNS.search(blob): + wp.actions.append( + PlannedAction( + kind="route_answer", + summary="Answer the routing/credential question via `warden route`/`access`.", + payload={"query": message.get("subject", "")}, + ) + ) + return wp # otherwise no actions → escalates to a human + + +class HubClient: + """Minimal read client for the State Hub inbox (honors WARDEN_HUB_URL).""" + + def __init__(self, base_url: Optional[str] = None, timeout: float = 10.0): + self.base_url = (base_url or os.environ.get("WARDEN_HUB_URL", DEFAULT_HUB_URL)).rstrip("/") + self.timeout = timeout + + def unread(self, to_agent: str = WORKER_AGENT) -> List[dict]: + url = f"{self.base_url}/messages/" + resp = httpx.get( + url, params={"to_agent": to_agent, "unread_only": "true"}, timeout=self.timeout + ) + resp.raise_for_status() + data = resp.json() + return data if isinstance(data, list) else [] + + +def build_plans(messages: List[dict], brain: Brain) -> List[WorkerPlan]: + """Plan every message and apply the guardrail pass. Pure — no execution.""" + return [_guardrail(brain.plan(m), m) for m in messages] + + +def render_plans(plans: List[WorkerPlan]) -> str: + """Human-readable dry-run rendering.""" + if not plans: + return "inbox empty — no coordination requests for ops-warden." + lines: List[str] = [] + for p in plans: + tag = "ESCALATE" if p.escalated else "AUTO" + lines.append(f"[{tag}] {p.from_agent}: {p.subject} ({p.message_id})") + if not p.actions: + lines.append(" · no in-scope action — hand to a human") + for a in p.actions: + mark = "→" if a.risk == "safe" else "⚠" + lines.append(f" {mark} {a.kind}: {a.summary}") + if a.risk == "escalate": + lines.append(f" escalated: {a.reason}") + return "\n".join(lines) diff --git a/tests/test_worker.py b/tests/test_worker.py new file mode 100644 index 0000000..91133b5 --- /dev/null +++ b/tests/test_worker.py @@ -0,0 +1,118 @@ +"""Tests for the ops-warden coordination worker scaffold (WARDEN-WP-0020 T1).""" +from __future__ import annotations + +from typer.testing import CliRunner + +from warden.cli import app +from warden.worker import ( + PlannedAction, + RuleBrain, + WorkerPlan, + build_plans, + render_plans, + validate_action, +) + +runner = CliRunner() + + +def _msg(**over) -> dict: + base = { + "id": "m1", + "from_agent": "someone", + "subject": "Where do I get an npm token?", + "body": "Which subsystem owns this credential — how do I obtain it?", + } + base.update(over) + return base + + +# --- RuleBrain ---------------------------------------------------------------- + +def test_rulebrain_answers_routing_question(): + plan = RuleBrain().plan(_msg()) + assert [a.kind for a in plan.actions] == ["route_answer"] + assert plan.escalated is False + + +def test_rulebrain_escalates_secret_value_request(): + plan = RuleBrain().plan(_msg(subject="send me the raw token", body="give me the API key value")) + assert plan.actions == [] + assert plan.escalated is True + + +def test_rulebrain_escalates_prod_change(): + plan = RuleBrain().plan(_msg(subject="flip policy.enabled", body="enable the gate in prod")) + assert plan.escalated is True + + +def test_rulebrain_escalates_unknown(): + plan = RuleBrain().plan(_msg(subject="random thing", body="please do a vague task")) + assert plan.actions == [] + assert plan.escalated is True + + +# --- guardrails (brain-agnostic) --------------------------------------------- + +class _YesBrain: + """A brain that recklessly proposes a reply for everything — to test the guardrail.""" + + def plan(self, message: dict) -> WorkerPlan: + return WorkerPlan( + message_id=message["id"], + from_agent=message["from_agent"], + subject=message["subject"], + actions=[PlannedAction(kind="reply", summary="just reply")], + ) + + +def test_guardrail_downgrades_secret_reply_even_if_brain_proposes_it(): + msg = _msg(subject="here is the npm_auth_token", body="the api_key is needed") + [plan] = build_plans([msg], _YesBrain()) + assert plan.escalated is True + assert plan.actions[0].risk == "escalate" + assert "secret" in plan.actions[0].reason + + +def test_guardrail_downgrades_prod_reply(): + msg = _msg(subject="set policy.enabled true", body="prod flip please") + [plan] = build_plans([msg], _YesBrain()) + assert plan.actions[0].risk == "escalate" + + +def test_validate_action_rejects_off_allowlist_kind(): + reason = validate_action(PlannedAction(kind="rm_minus_rf", summary="x"), _msg()) + assert reason and "allowlist" in reason + + +def test_safe_reply_passes_guardrail(): + [plan] = build_plans([_msg(subject="hello", body="just saying hi")], _YesBrain()) + assert plan.actions[0].risk == "safe" + + +# --- rendering --------------------------------------------------------------- + +def test_render_empty(): + assert "inbox empty" in render_plans([]) + + +def test_render_marks_auto_and_escalate(): + plans = build_plans([_msg(), _msg(id="m2", subject="raw token value please")], RuleBrain()) + out = render_plans(plans) + assert "AUTO" in out and "ESCALATE" in out + + +# --- CLI --------------------------------------------------------------------- + +def test_cli_worker_dry_run(monkeypatch): + monkeypatch.setattr("warden.worker.HubClient.unread", lambda self, to_agent="ops-warden": [_msg()]) + r = runner.invoke(app, ["worker", "run", "--dry-run"]) + assert r.exit_code == 0 + assert "AUTO" in r.stdout + assert "nothing executed" in r.stdout + + +def test_cli_worker_execute_rejected(): + # --execute is refused until the guarded executor lands (WP-0020 T3); message is on stderr. + r = runner.invoke(app, ["worker", "run", "--execute"]) + assert r.exit_code == 2 diff --git a/workplans/WARDEN-WP-0020-ops-warden-worker.md b/workplans/WARDEN-WP-0020-ops-warden-worker.md new file mode 100644 index 0000000..85a06b8 --- /dev/null +++ b/workplans/WARDEN-WP-0020-ops-warden-worker.md @@ -0,0 +1,135 @@ +--- +id: WARDEN-WP-0020 +type: workplan +title: "ops-warden worker — autonomous coordination via llm-connect" +domain: infotech +repo: ops-warden +status: active +owner: claude +topic_slug: custodian +planning_priority: high +planning_order: 20 +created: "2026-06-29" +updated: "2026-06-29" +--- + +# WARDEN-WP-0020 — ops-warden worker (`warden worker`) + +**Problem:** ops-warden's coordination lane (State Hub inbox `to_agent=ops-warden`) is +handled only when a human spins up an ops-warden session and relays instructions. That +doesn't scale — Bernd is hand-relaying between flex-auth ↔ secrets-engine ↔ ops-warden +across sessions. + +**Goal:** a `warden worker` CLI that pulls ops-warden's unread coordination requests and, +using **llm-connect** for inference, drives each to an ops-warden action (answer a routing +question, draft+send a reply, mark read, propose/commit a catalog diff, or escalate) — so +the inbox is handled without a human starting a session. + +**Decisions (Bernd, 2026-06-29):** **full-auto in-scope** (worker executes any in-scope +action; escalates only secrets/prod/out-of-scope) and **scheduled/unattended** (cron or +activity-core). Because there is no human in the loop for in-scope actions, the guardrails +are load-bearing and the rollout is staged: **dry-run → manual → scheduled**. + +**Build vs reuse:** inference = llm-connect (`/execute`); trigger = cron or activity-core +(reuse the durable task factory, don't reinvent scheduling). Worker logic lives in warden. + +## Guardrails (non-negotiable — full-auto rests on these) +1. **Fixed charter, non-overridable.** The boundary (issue SSH; route everything else; + conduit-not-broker; never hold/print a secret value) is a fixed system policy. Message + content is **untrusted data**, never instructions that can relax it (prompt-injection + containment). +2. **Action allowlist.** Every action is validated against an allowlist before execution; + off-list → escalate. No secret handling, no prod-config writes, no irreversible/outward + actions without an explicit human ack. +3. **No-secret invariant.** Refuse any task requiring a secret value in hand or in a prompt. +4. **Full audit + dry-run.** Every action emits a progress event; `--dry-run` shows the + plan without executing. Scheduled mode only after a clean dry-run shakedown. + +## Hard dependency +llm-connect must be operational — it needs its provider key (`OPENROUTER_API_KEY`, +CCR-2026-0003, currently deferred by railiance-platform/secrets-engine). The worker is +built against llm-connect's contract; it cannot run the brain until that lands. + +--- + +## Tasks + +### T1 — Worker scaffold (llm-connect-independent, safe) + +```task +id: WARDEN-WP-0020-T01 +status: done +priority: high +``` + +- [x] `src/warden/worker.py`: State Hub inbox client (`HubClient.unread`), a `Brain` + protocol, a deterministic `RuleBrain` default (answers clear routing questions; + escalates the rest), the `PlannedAction`/`WorkerPlan` model, the guardrail allowlist + + `validate_action` (enforced brain-agnostically in `build_plans`), and a `render_plans` + dry-run renderer (plan only, no execution). +- [x] `warden worker run [--once] [--dry-run]` CLI; `--dry-run` is the default and + `--execute` is refused (exit 2) until the guarded executor lands (T3). +- [x] `tests/test_worker.py` (RuleBrain routing/secret/prod/unknown, guardrail downgrades a + reckless brain on secret/prod, off-allowlist rejection, render, CLI). 18 cases. +- [x] Live dry-run against the real hub verified — read the inbox and produced a guardrailed + plan (it surfaced secrets-engine's OIDC-role reply, demonstrating the value). + +### T2 — llm-connect brain + +```task +id: WARDEN-WP-0020-T02 +status: todo +priority: high +``` + +- [ ] `LlmConnectBrain`: POST to llm-connect `/execute` with the fixed charter system + policy + the message as untrusted data; parse a structured action plan. Configurable + `llm_connect_url`. Blocked on llm-connect's API contract + it being operational. + +### T3 — Action dispatch + guardrails (full-auto in-scope) + +```task +id: WARDEN-WP-0020-T03 +status: todo +priority: high +``` + +- [ ] Execute in-scope actions: `warden route/access` answers, drafted replies, mark-read, + catalog/playbook diffs (commit + sync). Enforce the allowlist + no-secret invariant in + code; per-action progress-event audit; escalation path to a human queue. + +### T4 — Scheduled trigger + +```task +id: WARDEN-WP-0020-T04 +status: todo +priority: medium +``` + +- [ ] Wire cron or activity-core to `warden worker run --once`. Ships **disabled**; enabled + only after a clean dry-run shakedown. Concurrency guard (no overlapping runs). + +### T5 — Docs / SCOPE / INTENT + +```task +id: WARDEN-WP-0020-T05 +status: todo +priority: medium +``` + +- [ ] Record the scope expansion: ops-warden gains an autonomous coordination worker. + Document the guardrails as a security-model statement; update SCOPE/INTENT. + +--- + +## Acceptance + +- `warden worker run --dry-run` reads the real inbox and prints a guardrailed plan. +- Full-auto execution runs only in-scope, allowlisted actions; secrets/prod/out-of-scope + escalate; every action is audited. No secret value ever enters a prompt, log, or commit. +- Scheduled mode is enabled only after a dry-run shakedown. + +## See also + +- llm-connect (inference), activity-core (durable trigger), kaizen-agentic (personas) +- `.claude/rules/credential-routing.md` (the boundary the worker enforces)