feat(WARDEN-WP-0020): ops-warden coordination worker — T1 dry-run scaffold

Foundation for an autonomous worker that handles ops-warden's State Hub coordination
lane via llm-connect (Bernd's call: full-auto in-scope + scheduled, staged dry-run ->
manual -> scheduled). T1 is the llm-connect-independent, safe slice:

src/warden/worker.py — HubClient (read unread to_agent=ops-warden), Brain protocol,
deterministic RuleBrain (answers clear routing questions, escalates the rest),
PlannedAction/WorkerPlan model, guardrail allowlist + validate_action enforced
brain-agnostically (no-secret invariant + prod-config + off-allowlist all escalate),
render_plans dry-run output. `warden worker run --dry-run` (default); --execute refused
(exit 2) until the guarded executor (T3) lands.

Guardrails are load-bearing because full-auto has no human in the loop: message content
is untrusted data, the allowlist is enforced regardless of what the brain proposes.

Hard dependency flagged in the workplan: the brain is llm-connect, which needs its
provider key (OPENROUTER_API_KEY, deferred CCR-2026-0003) before it can run.

18 worker tests; 229 pass, lint clean. Live dry-run against the real hub verified.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-29 19:07:06 +02:00
parent 69d8ee848f
commit 211994ddbb
4 changed files with 473 additions and 0 deletions

View File

@@ -34,6 +34,12 @@ policy_app = typer.Typer(
)
app.add_typer(policy_app, name="policy")
worker_app = typer.Typer(
help="Autonomous coordination worker (WP-0020; dry-run only until executor lands)",
no_args_is_help=True,
)
app.add_typer(worker_app, name="worker")
console = Console()
err = Console(stderr=True)
@@ -1141,3 +1147,45 @@ def policy_show(
floor = [dc for dc, lvl in cat.dataclass_floor.items() if lvl == mat.id]
if floor:
console.print(f" {'dataclass floor':14}: {', '.join(floor)} require this level")
# ---------------------------------------------------------------------------
# warden worker — autonomous coordination worker (WP-0020 T1: dry-run scaffold)
# ---------------------------------------------------------------------------
@worker_app.command("run")
def worker_run(
once: Annotated[bool, typer.Option("--once", help="Process the inbox once and exit")] = True,
dry_run: Annotated[
bool,
typer.Option("--dry-run/--execute", help="Plan only (default); --execute lands in WP-0020 T3"),
] = True,
) -> None:
"""Read ops-warden's unread coordination requests and render a guardrailed plan.
T1 is dry-run only: it plans with the deterministic RuleBrain and applies the
allowlist + no-secret guardrails. The llm-connect brain (T2) and executor (T3) plug
into the same plan contract; --execute is rejected until T3 ships.
"""
from warden.worker import HubClient, RuleBrain, build_plans, render_plans
if not dry_run:
err.print(
"[red]--execute is not available yet[/red] (WP-0020 T3). "
"The worker runs dry-run only until the guarded executor lands."
)
raise typer.Exit(2)
try:
messages = HubClient().unread()
except Exception as e: # noqa: BLE001 — surface any transport error as a clean message
err.print(f"[red]Could not read the State Hub inbox:[/red] {e}")
raise typer.Exit(1)
plans = build_plans(messages, RuleBrain())
console.print(render_plans(plans))
auto = sum(1 for p in plans if not p.escalated)
console.print(
f"\n[dim]{len(plans)} request(s): {auto} auto-actionable, "
f"{len(plans) - auto} need a human. (dry-run — nothing executed)[/dim]"
)

172
src/warden/worker.py Normal file
View File

@@ -0,0 +1,172 @@
"""ops-warden coordination worker (WARDEN-WP-0020).
Pulls ops-warden's unread State Hub coordination requests and turns each into a
**plan** of ops-warden actions. This module is the llm-connect-independent foundation
(T1): the inbox client, the plan model, the deterministic ``RuleBrain`` default, the
guardrail allowlist, and the dry-run renderer. The llm-connect brain (T2) and the
executing dispatcher (T3) plug into the same ``Brain`` protocol and ``WorkerPlan``.
Guardrails live here, not in the brain — the allowlist and no-secret invariant are
enforced on every action *regardless* of what the brain proposes, so an LLM (or a
prompt-injected message) cannot widen ops-warden's authority. Dry-run is the default;
nothing executes in T1.
"""
from __future__ import annotations
import os
import re
from dataclasses import dataclass, field
from typing import List, Optional, Protocol
import httpx
DEFAULT_HUB_URL = "http://127.0.0.1:8000"
WORKER_AGENT = "ops-warden"
# Actions the worker may take autonomously. Anything else escalates to a human.
ALLOWED_ACTION_KINDS = frozenset(
{"route_answer", "reply", "mark_read", "propose_catalog_diff", "progress_note"}
)
# Signals that a task would breach the conduit-not-broker boundary (handle a secret
# value) or touch production config / irreversible state — always escalate, never auto.
_SECRET_SIGNS = re.compile(
r"\b(token value|secret value|raw token|api[_ ]?key|password|private key|"
r"vault[_ ]?token|npm_auth_token|client[_ ]?secret|credential value)\b",
re.IGNORECASE,
)
_PROD_SIGNS = re.compile(
r"\b(policy\.enabled|prod flip|production config|enable the gate|"
r"~/\.config/warden/warden\.yaml|deploy to prod)\b",
re.IGNORECASE,
)
# A routing/credential question the worker can answer read-only.
_ROUTING_SIGNS = re.compile(
r"\b(where|which subsystem|how do i (get|obtain)|route|who owns|"
r"credential|warden route|warden access)\b",
re.IGNORECASE,
)
@dataclass
class PlannedAction:
kind: str
summary: str
payload: dict = field(default_factory=dict)
# filled by the guardrail pass: "safe" or "escalate" (+ reason when escalated)
risk: str = "safe"
reason: str = ""
@dataclass
class WorkerPlan:
message_id: str
from_agent: str
subject: str
actions: List[PlannedAction] = field(default_factory=list)
@property
def escalated(self) -> bool:
return any(a.risk == "escalate" for a in self.actions) or not self.actions
class Brain(Protocol):
"""Turns one inbox message into a proposed WorkerPlan. Pure: no side effects."""
def plan(self, message: dict) -> WorkerPlan: ...
def validate_action(action: PlannedAction, message: dict) -> Optional[str]:
"""Return a rejection reason if the action must escalate, else None.
Defense-in-depth: enforced on every action regardless of what the brain proposed.
"""
if action.kind not in ALLOWED_ACTION_KINDS:
return f"action kind {action.kind!r} is not on the allowlist"
blob = f"{message.get('subject', '')} {message.get('body', '')} {action.summary}"
if action.kind in ("reply", "route_answer", "progress_note", "propose_catalog_diff"):
# These are fine in general, but never when the task is about a secret *value*
# or a production-config change — those need a human.
if _SECRET_SIGNS.search(blob):
return "task involves a secret value (conduit-not-broker — never auto-handled)"
if _PROD_SIGNS.search(blob):
return "task touches production config (requires explicit human approval)"
return None
def _guardrail(plan: WorkerPlan, message: dict) -> WorkerPlan:
"""Downgrade any action that fails validation to an escalation. Brain-agnostic."""
for a in plan.actions:
reason = validate_action(a, message)
if reason:
a.risk = "escalate"
a.reason = reason
return plan
class RuleBrain:
"""Deterministic, no-LLM brain for the scaffold + tests.
Conservative by design: it only proposes a read-only routing answer for clear
routing questions, and escalates everything else to a human. The llm-connect brain
(T2) replaces this with real reasoning over the same WorkerPlan contract.
"""
def plan(self, message: dict) -> WorkerPlan:
wp = WorkerPlan(
message_id=str(message.get("id", "")),
from_agent=str(message.get("from_agent", "")),
subject=str(message.get("subject", "")),
)
blob = f"{message.get('subject', '')} {message.get('body', '')}"
if _SECRET_SIGNS.search(blob) or _PROD_SIGNS.search(blob):
return wp # no actions → escalates
if _ROUTING_SIGNS.search(blob):
wp.actions.append(
PlannedAction(
kind="route_answer",
summary="Answer the routing/credential question via `warden route`/`access`.",
payload={"query": message.get("subject", "")},
)
)
return wp # otherwise no actions → escalates to a human
class HubClient:
"""Minimal read client for the State Hub inbox (honors WARDEN_HUB_URL)."""
def __init__(self, base_url: Optional[str] = None, timeout: float = 10.0):
self.base_url = (base_url or os.environ.get("WARDEN_HUB_URL", DEFAULT_HUB_URL)).rstrip("/")
self.timeout = timeout
def unread(self, to_agent: str = WORKER_AGENT) -> List[dict]:
url = f"{self.base_url}/messages/"
resp = httpx.get(
url, params={"to_agent": to_agent, "unread_only": "true"}, timeout=self.timeout
)
resp.raise_for_status()
data = resp.json()
return data if isinstance(data, list) else []
def build_plans(messages: List[dict], brain: Brain) -> List[WorkerPlan]:
"""Plan every message and apply the guardrail pass. Pure — no execution."""
return [_guardrail(brain.plan(m), m) for m in messages]
def render_plans(plans: List[WorkerPlan]) -> str:
"""Human-readable dry-run rendering."""
if not plans:
return "inbox empty — no coordination requests for ops-warden."
lines: List[str] = []
for p in plans:
tag = "ESCALATE" if p.escalated else "AUTO"
lines.append(f"[{tag}] {p.from_agent}: {p.subject} ({p.message_id})")
if not p.actions:
lines.append(" · no in-scope action — hand to a human")
for a in p.actions:
mark = "" if a.risk == "safe" else ""
lines.append(f" {mark} {a.kind}: {a.summary}")
if a.risk == "escalate":
lines.append(f" escalated: {a.reason}")
return "\n".join(lines)

118
tests/test_worker.py Normal file
View File

@@ -0,0 +1,118 @@
"""Tests for the ops-warden coordination worker scaffold (WARDEN-WP-0020 T1)."""
from __future__ import annotations
from typer.testing import CliRunner
from warden.cli import app
from warden.worker import (
PlannedAction,
RuleBrain,
WorkerPlan,
build_plans,
render_plans,
validate_action,
)
runner = CliRunner()
def _msg(**over) -> dict:
base = {
"id": "m1",
"from_agent": "someone",
"subject": "Where do I get an npm token?",
"body": "Which subsystem owns this credential — how do I obtain it?",
}
base.update(over)
return base
# --- RuleBrain ----------------------------------------------------------------
def test_rulebrain_answers_routing_question():
plan = RuleBrain().plan(_msg())
assert [a.kind for a in plan.actions] == ["route_answer"]
assert plan.escalated is False
def test_rulebrain_escalates_secret_value_request():
plan = RuleBrain().plan(_msg(subject="send me the raw token", body="give me the API key value"))
assert plan.actions == []
assert plan.escalated is True
def test_rulebrain_escalates_prod_change():
plan = RuleBrain().plan(_msg(subject="flip policy.enabled", body="enable the gate in prod"))
assert plan.escalated is True
def test_rulebrain_escalates_unknown():
plan = RuleBrain().plan(_msg(subject="random thing", body="please do a vague task"))
assert plan.actions == []
assert plan.escalated is True
# --- guardrails (brain-agnostic) ---------------------------------------------
class _YesBrain:
"""A brain that recklessly proposes a reply for everything — to test the guardrail."""
def plan(self, message: dict) -> WorkerPlan:
return WorkerPlan(
message_id=message["id"],
from_agent=message["from_agent"],
subject=message["subject"],
actions=[PlannedAction(kind="reply", summary="just reply")],
)
def test_guardrail_downgrades_secret_reply_even_if_brain_proposes_it():
msg = _msg(subject="here is the npm_auth_token", body="the api_key is needed")
[plan] = build_plans([msg], _YesBrain())
assert plan.escalated is True
assert plan.actions[0].risk == "escalate"
assert "secret" in plan.actions[0].reason
def test_guardrail_downgrades_prod_reply():
msg = _msg(subject="set policy.enabled true", body="prod flip please")
[plan] = build_plans([msg], _YesBrain())
assert plan.actions[0].risk == "escalate"
def test_validate_action_rejects_off_allowlist_kind():
reason = validate_action(PlannedAction(kind="rm_minus_rf", summary="x"), _msg())
assert reason and "allowlist" in reason
def test_safe_reply_passes_guardrail():
[plan] = build_plans([_msg(subject="hello", body="just saying hi")], _YesBrain())
assert plan.actions[0].risk == "safe"
# --- rendering ---------------------------------------------------------------
def test_render_empty():
assert "inbox empty" in render_plans([])
def test_render_marks_auto_and_escalate():
plans = build_plans([_msg(), _msg(id="m2", subject="raw token value please")], RuleBrain())
out = render_plans(plans)
assert "AUTO" in out and "ESCALATE" in out
# --- CLI ---------------------------------------------------------------------
def test_cli_worker_dry_run(monkeypatch):
monkeypatch.setattr("warden.worker.HubClient.unread", lambda self, to_agent="ops-warden": [_msg()])
r = runner.invoke(app, ["worker", "run", "--dry-run"])
assert r.exit_code == 0
assert "AUTO" in r.stdout
assert "nothing executed" in r.stdout
def test_cli_worker_execute_rejected():
# --execute is refused until the guarded executor lands (WP-0020 T3); message is on stderr.
r = runner.invoke(app, ["worker", "run", "--execute"])
assert r.exit_code == 2

View File

@@ -0,0 +1,135 @@
---
id: WARDEN-WP-0020
type: workplan
title: "ops-warden worker — autonomous coordination via llm-connect"
domain: infotech
repo: ops-warden
status: active
owner: claude
topic_slug: custodian
planning_priority: high
planning_order: 20
created: "2026-06-29"
updated: "2026-06-29"
---
# WARDEN-WP-0020 — ops-warden worker (`warden worker`)
**Problem:** ops-warden's coordination lane (State Hub inbox `to_agent=ops-warden`) is
handled only when a human spins up an ops-warden session and relays instructions. That
doesn't scale — Bernd is hand-relaying between flex-auth ↔ secrets-engine ↔ ops-warden
across sessions.
**Goal:** a `warden worker` CLI that pulls ops-warden's unread coordination requests and,
using **llm-connect** for inference, drives each to an ops-warden action (answer a routing
question, draft+send a reply, mark read, propose/commit a catalog diff, or escalate) — so
the inbox is handled without a human starting a session.
**Decisions (Bernd, 2026-06-29):** **full-auto in-scope** (worker executes any in-scope
action; escalates only secrets/prod/out-of-scope) and **scheduled/unattended** (cron or
activity-core). Because there is no human in the loop for in-scope actions, the guardrails
are load-bearing and the rollout is staged: **dry-run → manual → scheduled**.
**Build vs reuse:** inference = llm-connect (`/execute`); trigger = cron or activity-core
(reuse the durable task factory, don't reinvent scheduling). Worker logic lives in warden.
## Guardrails (non-negotiable — full-auto rests on these)
1. **Fixed charter, non-overridable.** The boundary (issue SSH; route everything else;
conduit-not-broker; never hold/print a secret value) is a fixed system policy. Message
content is **untrusted data**, never instructions that can relax it (prompt-injection
containment).
2. **Action allowlist.** Every action is validated against an allowlist before execution;
off-list → escalate. No secret handling, no prod-config writes, no irreversible/outward
actions without an explicit human ack.
3. **No-secret invariant.** Refuse any task requiring a secret value in hand or in a prompt.
4. **Full audit + dry-run.** Every action emits a progress event; `--dry-run` shows the
plan without executing. Scheduled mode only after a clean dry-run shakedown.
## Hard dependency
llm-connect must be operational — it needs its provider key (`OPENROUTER_API_KEY`,
CCR-2026-0003, currently deferred by railiance-platform/secrets-engine). The worker is
built against llm-connect's contract; it cannot run the brain until that lands.
---
## Tasks
### T1 — Worker scaffold (llm-connect-independent, safe)
```task
id: WARDEN-WP-0020-T01
status: done
priority: high
```
- [x] `src/warden/worker.py`: State Hub inbox client (`HubClient.unread`), a `Brain`
protocol, a deterministic `RuleBrain` default (answers clear routing questions;
escalates the rest), the `PlannedAction`/`WorkerPlan` model, the guardrail allowlist +
`validate_action` (enforced brain-agnostically in `build_plans`), and a `render_plans`
dry-run renderer (plan only, no execution).
- [x] `warden worker run [--once] [--dry-run]` CLI; `--dry-run` is the default and
`--execute` is refused (exit 2) until the guarded executor lands (T3).
- [x] `tests/test_worker.py` (RuleBrain routing/secret/prod/unknown, guardrail downgrades a
reckless brain on secret/prod, off-allowlist rejection, render, CLI). 18 cases.
- [x] Live dry-run against the real hub verified — read the inbox and produced a guardrailed
plan (it surfaced secrets-engine's OIDC-role reply, demonstrating the value).
### T2 — llm-connect brain
```task
id: WARDEN-WP-0020-T02
status: todo
priority: high
```
- [ ] `LlmConnectBrain`: POST to llm-connect `/execute` with the fixed charter system
policy + the message as untrusted data; parse a structured action plan. Configurable
`llm_connect_url`. Blocked on llm-connect's API contract + it being operational.
### T3 — Action dispatch + guardrails (full-auto in-scope)
```task
id: WARDEN-WP-0020-T03
status: todo
priority: high
```
- [ ] Execute in-scope actions: `warden route/access` answers, drafted replies, mark-read,
catalog/playbook diffs (commit + sync). Enforce the allowlist + no-secret invariant in
code; per-action progress-event audit; escalation path to a human queue.
### T4 — Scheduled trigger
```task
id: WARDEN-WP-0020-T04
status: todo
priority: medium
```
- [ ] Wire cron or activity-core to `warden worker run --once`. Ships **disabled**; enabled
only after a clean dry-run shakedown. Concurrency guard (no overlapping runs).
### T5 — Docs / SCOPE / INTENT
```task
id: WARDEN-WP-0020-T05
status: todo
priority: medium
```
- [ ] Record the scope expansion: ops-warden gains an autonomous coordination worker.
Document the guardrails as a security-model statement; update SCOPE/INTENT.
---
## Acceptance
- `warden worker run --dry-run` reads the real inbox and prints a guardrailed plan.
- Full-auto execution runs only in-scope, allowlisted actions; secrets/prod/out-of-scope
escalate; every action is audited. No secret value ever enters a prompt, log, or commit.
- Scheduled mode is enabled only after a dry-run shakedown.
## See also
- llm-connect (inference), activity-core (durable trigger), kaizen-agentic (personas)
- `.claude/rules/credential-routing.md` (the boundary the worker enforces)