Files
the-custodian/runtime/actions.py
tegwick 2fdbcb5d7a feat(CUST-WP-0001): implement Custodian Agent Runtime bootstrap
T2 complete: OODA loop skeleton with LLM integration, bounded actions,
and 32 offline unit tests.

Deliverables:
- runtime/agent.py     — CLI entry point (--domain/--all/--dry-run/--llm)
- runtime/context.py   — Observe: fetch_state + build_context
- runtime/actions.py   — Act: parse_plan + execute (3 sanctioned writes)
- runtime/README.md    — usage guide and architecture overview
- runtime/tests/       — 32 tests, fully offline
- runtime/pyproject.toml — standalone package with llm-connect dep
- canon/architecture/adr-002-custodian-agent-runtime-design.md

Key design decisions (ADR-002):
- Lives in runtime/ (not a new repo) — tight canon/state-hub coupling
- ClaudeCodeAdapter by default (local-first, no API key)
- Single-pass synchronous OODA for v0.1 simplicity
- Exactly 3 sanctioned write ops: add_progress_event, update_task_status, flag_for_human
- LLM returns JSON block in markdown for structured+auditable output

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 22:36:24 +01:00

152 lines
4.7 KiB
Python

"""Bounded action executor — only sanctioned write operations.
Act step of the OODA loop. Parses the LLM's JSON plan and executes
exactly the three operations permitted by the constitution:
1. add_progress_event — append-only observation log entry
2. update_task_status — reversible task status change
3. flag_for_human — escalation flag (not an action, a signal)
"""
from __future__ import annotations
import json
import re
from typing import Any
import httpx
from context import API_BASE
# Exactly three write operations are sanctioned (ADR-002 D4).
SANCTIONED_ACTIONS = frozenset({
"add_progress_event",
"update_task_status",
"flag_for_human",
})
_JSON_BLOCK_RE = re.compile(r"```json\s*\n(.*?)\n```", re.DOTALL)
_EMPTY_PLAN: dict[str, Any] = {
"progress_events": [],
"tasks_to_update": [],
"tasks_to_flag": [],
}
def parse_plan(llm_response: str) -> dict[str, Any]:
"""Extract the JSON action plan from the LLM's markdown response.
Finds the first ```json ... ``` block, parses it, and fills in missing
keys with empty defaults. Returns an empty plan on any parse failure.
Args:
llm_response: Raw LLM output (markdown with embedded JSON block).
Returns:
Plan dict with keys: progress_events, tasks_to_update, tasks_to_flag.
(observations key is preserved but not acted on.)
"""
match = _JSON_BLOCK_RE.search(llm_response)
if not match:
return dict(_EMPTY_PLAN)
try:
raw = json.loads(match.group(1))
except (json.JSONDecodeError, ValueError):
return dict(_EMPTY_PLAN)
# Ensure all required keys are present with empty defaults
return {
"observations": raw.get("observations", []),
"progress_events": raw.get("progress_events", []),
"tasks_to_update": raw.get("tasks_to_update", []),
"tasks_to_flag": raw.get("tasks_to_flag", []),
}
def execute(
plan: dict[str, Any],
api_base: str = API_BASE,
dry_run: bool = False,
) -> list[str]:
"""Execute sanctioned actions from the plan.
Args:
plan: Parsed plan dict from parse_plan().
api_base: Base URL for the state-hub API.
dry_run: If True, describe actions without making any HTTP calls.
Returns:
List of human-readable result strings (one per action attempted).
"""
results: list[str] = []
# 1. Progress events (add_progress_event)
for event in plan.get("progress_events", []):
summary = event.get("summary", "").strip()
if not summary:
continue
desc = f"add_progress_event: {summary!r}"
if dry_run:
results.append(f"[dry-run] {desc}")
continue
payload = {
"summary": summary,
"event_type": event.get("event_type", "note"),
}
if event.get("workstream_id"):
payload["workstream_id"] = event["workstream_id"]
try:
resp = httpx.post(
api_base.rstrip("/") + "/progress/",
json=payload,
timeout=10.0,
)
resp.raise_for_status()
results.append(f"{desc}")
except Exception as exc:
results.append(f"✗ failed {desc}: {exc}")
# 2. Task status updates (update_task_status)
for update in plan.get("tasks_to_update", []):
task_id = update.get("task_id", "").strip()
status = update.get("status", "").strip()
if not task_id or not status:
continue
desc = f"update_task_status: {task_id[:8]}… → {status!r}"
if dry_run:
results.append(f"[dry-run] {desc}")
continue
try:
resp = httpx.patch(
api_base.rstrip("/") + f"/tasks/{task_id}/",
json={"status": status},
timeout=10.0,
)
resp.raise_for_status()
results.append(f"{desc}")
except Exception as exc:
results.append(f"✗ failed {desc}: {exc}")
# 3. Human flags (flag_for_human)
for flag in plan.get("tasks_to_flag", []):
task_id = flag.get("task_id", "").strip()
note = flag.get("note", "").strip()
if not task_id:
continue
desc = f"flag_for_human: {task_id[:8]}… — {note!r}"
if dry_run:
results.append(f"[dry-run] {desc}")
continue
try:
resp = httpx.patch(
api_base.rstrip("/") + f"/tasks/{task_id}/",
json={"needs_human": True, "intervention_note": note},
timeout=10.0,
)
resp.raise_for_status()
results.append(f"{desc}")
except Exception as exc:
results.append(f"✗ failed {desc}: {exc}")
return results