feat(CUST-WP-0001): implement Custodian Agent Runtime bootstrap

T2 complete: OODA loop skeleton with LLM integration, bounded actions,
and 32 offline unit tests.

Deliverables:
- runtime/agent.py     — CLI entry point (--domain/--all/--dry-run/--llm)
- runtime/context.py   — Observe: fetch_state + build_context
- runtime/actions.py   — Act: parse_plan + execute (3 sanctioned writes)
- runtime/README.md    — usage guide and architecture overview
- runtime/tests/       — 32 tests, fully offline
- runtime/pyproject.toml — standalone package with llm-connect dep
- canon/architecture/adr-002-custodian-agent-runtime-design.md

Key design decisions (ADR-002):
- Lives in runtime/ (not a new repo) — tight canon/state-hub coupling
- ClaudeCodeAdapter by default (local-first, no API key)
- Single-pass synchronous OODA for v0.1 simplicity
- Exactly 3 sanctioned write ops: add_progress_event, update_task_status, flag_for_human
- LLM returns JSON block in markdown for structured+auditable output

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-12 22:36:24 +01:00
parent 5358d417ec
commit 2fdbcb5d7a
11 changed files with 1227 additions and 3 deletions

View File

@@ -0,0 +1,122 @@
---
id: ADR-002
type: architecture-decision-record
title: "Custodian Agent Runtime — v0.1 Bootstrap Design"
status: accepted
decided_by: Bernd Worsch
date: "2026-03-12"
tags: ["architecture", "agent-runtime", "llm", "ooda", "bounded-agency"]
---
# ADR-002: Custodian Agent Runtime — v0.1 Bootstrap Design
## Status
Accepted.
## Context
CUST-WP-0001 requires a first working skeleton of the Custodian as an acting
agent: a loop that observes project state, reasons about it, and executes
bounded write operations — without human interaction for each step.
The dependencies (llm-connect, operational Railiance infra) are now resolved.
This ADR captures the five key architectural decisions for the v0.1 bootstrap.
## Decisions
### D1 — Location: `runtime/` inside the-custodian (not a new repo)
**Decision:** The runtime lives under `the-custodian/runtime/` as a standalone
Python package (`pyproject.toml`, own venv) rather than a new repository.
**Rationale:** The runtime is tightly coupled to canon (reads constitution,
memory) and the state-hub (its primary coordination layer). A separate repo
adds friction with no v0.1 benefit. The existing `runtime/` scaffold confirms
the original intent. Extraction to its own repo is deferred to when the
runtime has stable boundaries and multiple consumers.
### D2 — OODA loop: single-pass synchronous CLI
**Decision:** One `run()` call = one complete Observe → Orient → Decide → Act
cycle. Entry point is a CLI (`agent.py`) invoked manually or by a cron job.
```
Observe — HTTP GET to state-hub: state summary or domain summary
Orient — Load constitution + build structured LLM context prompt
Decide — Single LLM call (via llm-connect) returns a JSON action plan
Act — Execute only sanctioned write operations from the plan
```
**Rationale:** Async event loops and daemons add operational complexity that
v0.1 doesn't need. A single-pass CLI is testable, debuggable, and can be
scheduled externally. The transition to an event-driven loop is Phase 2.
### D3 — LLM backend: ClaudeCodeAdapter by default
**Decision:** The runtime uses `llm_connect.ClaudeCodeAdapter` as its default
LLM backend (shells out to `claude --print`). Provider is configurable via
`--llm` flag to support `gemini`, `openrouter`, or `openai`.
**Rationale:** `ClaudeCodeAdapter` requires no API key and honours the
Local-First value (V2). All current deployments have Claude Code available.
The llm-connect abstraction means switching providers is a one-line change.
### D4 — Action constraint: three sanctioned write operations only
**Decision:** The agent may execute exactly three state-hub write operations
without human approval:
1. `add_progress_event` — append an observation to the event log
2. `update_task_status` — mark a task done/in_progress (reversible)
3. `flag_for_human` — raise an intervention flag (escalation, not action)
All other operations (create workstream, record decision, resolve decision,
write to canon) require human approval before execution.
**Rationale:** Constitution §3/§4 require bounded agency. The three operations
are either append-only (progress events), reversible (task status), or
explicitly escalating (flag). They cannot produce irreversible harm.
### D5 — LLM response format: JSON block in markdown
**Decision:** The LLM is prompted to return a Markdown response with a
fenced ```json block containing the structured action plan:
```json
{
"observations": ["..."],
"progress_events": [
{"summary": "...", "workstream_id": "...", "event_type": "note"}
],
"tasks_to_update": [
{"task_id": "...", "status": "done"}
],
"tasks_to_flag": [
{"task_id": "...", "note": "..."}
]
}
```
The surrounding Markdown is preserved as a human-readable reasoning trace
and written to `memory/working/` as a session note.
**Rationale:** JSON blocks are robust to extraction (delimited), LLMs produce
them reliably with clear instructions, and the surrounding prose gives Bernd
an auditable reasoning trace without requiring a separate reasoning step.
## Consequences
- `runtime/` becomes a standalone Python package; `make agent-run DOMAIN=x`
invokes it.
- The runtime has no DB schema changes and no new API endpoints — it is a
pure client of the existing state-hub HTTP API.
- Autonomous actions are limited to append-only writes and escalations. Any
expansion of the action surface requires a new ADR and human approval.
- The v0.1 loop is single-user (Bernd). Multi-agent expansion is Phase 2+.
## Deferred
- Async event loop / daemon mode (Phase 2)
- RAG over canon (Phase 1 roadmap item)
- Tool adapters beyond state-hub HTTP (planned in `runtime/tool_adapters/`)
- Deployment on Railiance k3s as a scheduled CronJob

109
runtime/README.md Normal file
View File

@@ -0,0 +1,109 @@
# Custodian Agent Runtime — v0.1
Single-pass OODA agent loop. Observes project state via the State Hub, reasons
about it with an LLM, and executes bounded write operations.
## Architecture
```
agent.py CLI entry point + OODA orchestrator
context.py Observe: fetch state-hub data + build LLM context prompt
actions.py Act: execute sanctioned write operations
prompts/ System prompt templates (future)
policies/ Agent-level policies (future)
tool_adapters/ Additional MCP/API tool adapters (future)
tests/ Unit tests (offline, no live API required)
```
See `canon/architecture/adr-002-custodian-agent-runtime-design.md` for
all architectural decisions.
## OODA Loop
```
Observe → fetch_state(domain) # HTTP GET /state/summary or /state/domain/{slug}
Orient → load_constitution() # reads canon/constitution/
build_context(state, const) # assembles LLM prompt
Decide → LLM call via llm-connect # returns markdown + JSON action plan
Act → parse_plan(response) # extract JSON block
execute(plan) # run sanctioned writes
```
## Sanctioned Write Operations (ADR-002 D4)
Only three operations may be executed without human approval:
| Operation | State-hub endpoint | Reversible |
|---|---|---|
| `add_progress_event` | `POST /progress/` | Yes (append-only log) |
| `update_task_status` | `PATCH /tasks/{id}/` | Yes |
| `flag_for_human` | `PATCH /tasks/{id}/` | Yes (clear with `clear_human_flag`) |
## Prerequisites
- State-hub running: `cd state-hub && make api`
- LLM available: `claude` CLI in PATH (for default `claude-code` provider)
or set `OPENROUTER_API_KEY` / `GEMINI_API_KEY` for other providers
## Install
```bash
cd runtime
uv sync
```
## Usage
```bash
cd runtime
# Focus on custodian domain (cheaper — ~10% of full summary tokens)
uv run python agent.py --domain custodian
# Full cross-domain view
uv run python agent.py --all
# Preview actions without executing
uv run python agent.py --domain custodian --dry-run
# Use a different LLM provider
uv run python agent.py --domain custodian --llm gemini
uv run python agent.py --domain custodian --llm openrouter
# Custom state-hub URL
uv run python agent.py --domain custodian --api-base http://10.0.0.5:8000
```
## Output
The agent prints a trace to stdout:
```
[custodian-agent] 2026-03-12T20:00:00 scope=domain=custodian
[observe] fetching state from state-hub…
[orient] loading constitution and building context…
[decide] calling LLM via provider='claude-code'…
[act] executing plan (live): 1 events, 0 task updates, 0 flags
✓ add_progress_event: 'Reviewed custodian domain: 2 active workstreams…'
[custodian-agent] done — 1 actions.
```
The LLM's reasoning trace is saved to `memory/working/agent-session-{ts}-{scope}.md`.
## Tests
```bash
cd runtime
uv run pytest -v
```
All 32 tests run offline (no live state-hub, no LLM API key required).
## Extending
- **New observations**: extend `build_context()` in `context.py`
- **New actions**: add to `actions.py` and update `SANCTIONED_ACTIONS` — but
any expansion of the action surface requires a new ADR and human approval
(see constitution §2§4)
- **Tool adapters**: add to `tool_adapters/` following the llm-connect
`LLMAdapter` pattern

151
runtime/actions.py Normal file
View File

@@ -0,0 +1,151 @@
"""Bounded action executor — only sanctioned write operations.
Act step of the OODA loop. Parses the LLM's JSON plan and executes
exactly the three operations permitted by the constitution:
1. add_progress_event — append-only observation log entry
2. update_task_status — reversible task status change
3. flag_for_human — escalation flag (not an action, a signal)
"""
from __future__ import annotations
import json
import re
from typing import Any
import httpx
from context import API_BASE
# Exactly three write operations are sanctioned (ADR-002 D4).
SANCTIONED_ACTIONS = frozenset({
"add_progress_event",
"update_task_status",
"flag_for_human",
})
_JSON_BLOCK_RE = re.compile(r"```json\s*\n(.*?)\n```", re.DOTALL)
_EMPTY_PLAN: dict[str, Any] = {
"progress_events": [],
"tasks_to_update": [],
"tasks_to_flag": [],
}
def parse_plan(llm_response: str) -> dict[str, Any]:
"""Extract the JSON action plan from the LLM's markdown response.
Finds the first ```json ... ``` block, parses it, and fills in missing
keys with empty defaults. Returns an empty plan on any parse failure.
Args:
llm_response: Raw LLM output (markdown with embedded JSON block).
Returns:
Plan dict with keys: progress_events, tasks_to_update, tasks_to_flag.
(observations key is preserved but not acted on.)
"""
match = _JSON_BLOCK_RE.search(llm_response)
if not match:
return dict(_EMPTY_PLAN)
try:
raw = json.loads(match.group(1))
except (json.JSONDecodeError, ValueError):
return dict(_EMPTY_PLAN)
# Ensure all required keys are present with empty defaults
return {
"observations": raw.get("observations", []),
"progress_events": raw.get("progress_events", []),
"tasks_to_update": raw.get("tasks_to_update", []),
"tasks_to_flag": raw.get("tasks_to_flag", []),
}
def execute(
plan: dict[str, Any],
api_base: str = API_BASE,
dry_run: bool = False,
) -> list[str]:
"""Execute sanctioned actions from the plan.
Args:
plan: Parsed plan dict from parse_plan().
api_base: Base URL for the state-hub API.
dry_run: If True, describe actions without making any HTTP calls.
Returns:
List of human-readable result strings (one per action attempted).
"""
results: list[str] = []
# 1. Progress events (add_progress_event)
for event in plan.get("progress_events", []):
summary = event.get("summary", "").strip()
if not summary:
continue
desc = f"add_progress_event: {summary!r}"
if dry_run:
results.append(f"[dry-run] {desc}")
continue
payload = {
"summary": summary,
"event_type": event.get("event_type", "note"),
}
if event.get("workstream_id"):
payload["workstream_id"] = event["workstream_id"]
try:
resp = httpx.post(
api_base.rstrip("/") + "/progress/",
json=payload,
timeout=10.0,
)
resp.raise_for_status()
results.append(f"{desc}")
except Exception as exc:
results.append(f"✗ failed {desc}: {exc}")
# 2. Task status updates (update_task_status)
for update in plan.get("tasks_to_update", []):
task_id = update.get("task_id", "").strip()
status = update.get("status", "").strip()
if not task_id or not status:
continue
desc = f"update_task_status: {task_id[:8]}… → {status!r}"
if dry_run:
results.append(f"[dry-run] {desc}")
continue
try:
resp = httpx.patch(
api_base.rstrip("/") + f"/tasks/{task_id}/",
json={"status": status},
timeout=10.0,
)
resp.raise_for_status()
results.append(f"{desc}")
except Exception as exc:
results.append(f"✗ failed {desc}: {exc}")
# 3. Human flags (flag_for_human)
for flag in plan.get("tasks_to_flag", []):
task_id = flag.get("task_id", "").strip()
note = flag.get("note", "").strip()
if not task_id:
continue
desc = f"flag_for_human: {task_id[:8]}… — {note!r}"
if dry_run:
results.append(f"[dry-run] {desc}")
continue
try:
resp = httpx.patch(
api_base.rstrip("/") + f"/tasks/{task_id}/",
json={"needs_human": True, "intervention_note": note},
timeout=10.0,
)
resp.raise_for_status()
results.append(f"{desc}")
except Exception as exc:
results.append(f"✗ failed {desc}: {exc}")
return results

134
runtime/agent.py Normal file
View File

@@ -0,0 +1,134 @@
"""Custodian Agent Runtime — single OODA cycle entry point.
Usage:
uv run python agent.py --domain custodian
uv run python agent.py --all
uv run python agent.py --domain custodian --dry-run
uv run python agent.py --domain custodian --llm gemini
The agent runs one complete Observe → Orient → Decide → Act cycle and exits.
All output is printed to stdout. Errors are non-fatal where possible.
See ADR-002 for architecture decisions.
"""
from __future__ import annotations
import argparse
import datetime
import sys
from pathlib import Path
from actions import execute, parse_plan
from context import API_BASE, build_context, fetch_state, load_constitution
try:
from llm_connect import RunConfig, create_adapter
_HAS_LLM_CONNECT = True
except ImportError:
_HAS_LLM_CONNECT = False
def run(
domain: str | None,
dry_run: bool,
llm_provider: str,
api_base: str = API_BASE,
) -> int:
"""Execute one OODA cycle. Returns exit code (0 = ok, 1 = error)."""
scope = f"domain={domain}" if domain else "all domains"
print(f"[custodian-agent] {datetime.datetime.now().isoformat(timespec='seconds')} scope={scope}")
# --- Observe ---
print("[observe] fetching state from state-hub…")
state = fetch_state(domain=domain, api_base=api_base)
if not state:
print("[observe] WARNING: state-hub unreachable or returned empty state. "
"Proceeding with empty context (graceful degradation).")
# --- Orient ---
print("[orient] loading constitution and building context…")
constitution = load_constitution()
context = build_context(state, constitution)
# --- Decide ---
if not _HAS_LLM_CONNECT:
print("[decide] ERROR: llm-connect not available. Run `uv sync` first.", file=sys.stderr)
return 1
print(f"[decide] calling LLM via provider={llm_provider!r}")
try:
adapter = create_adapter(llm_provider)
config = RunConfig(temperature=0.3, max_tokens=2000)
llm_response = adapter.execute_prompt(context, config)
response_text = llm_response.content
except Exception as exc:
print(f"[decide] ERROR: LLM call failed: {exc}", file=sys.stderr)
return 1
# Save reasoning trace to working memory
_save_session_note(response_text, domain)
# --- Act ---
plan = parse_plan(response_text)
mode = "dry-run" if dry_run else "live"
print(f"[act] executing plan ({mode}): "
f"{len(plan['progress_events'])} events, "
f"{len(plan['tasks_to_update'])} task updates, "
f"{len(plan['tasks_to_flag'])} flags")
results = execute(plan, api_base=api_base, dry_run=dry_run)
for r in results:
print(f" {r}")
print(f"[custodian-agent] done — {len(results)} actions.")
return 0
def _save_session_note(response_text: str, domain: str | None) -> None:
"""Append the LLM reasoning trace to working memory (append-only)."""
memory_dir = Path(__file__).parent.parent / "memory" / "working"
if not memory_dir.exists():
return
timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%H%M%S")
scope = domain or "all"
note_path = memory_dir / f"agent-session-{timestamp}-{scope}.md"
try:
note_path.write_text(
f"---\ntype: agent-session-note\nscope: {scope}\ntimestamp: {timestamp}\n---\n\n"
+ response_text,
encoding="utf-8",
)
except Exception:
pass # Non-fatal — memory write failure does not stop the cycle
def main() -> None:
parser = argparse.ArgumentParser(
description="Custodian Agent — single OODA cycle (Observe→Orient→Decide→Act)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
uv run python agent.py --domain custodian
uv run python agent.py --domain custodian --dry-run
uv run python agent.py --all --llm gemini
""",
)
scope = parser.add_mutually_exclusive_group(required=True)
scope.add_argument("--domain", metavar="SLUG",
help="Focus on a single domain (e.g. custodian, railiance)")
scope.add_argument("--all", action="store_true",
help="Run over full cross-domain state summary")
parser.add_argument("--dry-run", action="store_true",
help="Print planned actions without executing them")
parser.add_argument("--llm", default="claude-code", metavar="PROVIDER",
help="LLM provider: claude-code (default), gemini, openrouter, openai")
parser.add_argument("--api-base", default=API_BASE, metavar="URL",
help=f"State-hub API base URL (default: {API_BASE})")
args = parser.parse_args()
domain = args.domain if not args.all else None
sys.exit(run(domain, args.dry_run, args.llm, args.api_base))
if __name__ == "__main__":
main()

160
runtime/context.py Normal file
View File

@@ -0,0 +1,160 @@
"""Observation layer — fetches state-hub data and builds LLM context.
Observe step of the OODA loop:
- fetch_state: HTTP GET to state-hub /state/summary or /state/domain/{slug}
- load_constitution: reads the custodian constitution for the system prompt
- build_context: assembles the full prompt sent to the LLM
"""
from __future__ import annotations
import json
from pathlib import Path
import httpx
API_BASE = "http://127.0.0.1:8000"
CONSTITUTION_PATH = Path(__file__).parent.parent / "canon" / "constitution" / "custodian_constitution_v0.1.md"
_CONSTITUTION_FALLBACK = (
"You are the Custodian agent. Act as a bounded co-creator. "
"Do not take irreversible actions. Escalate when uncertain."
)
def fetch_state(domain: str | None = None, api_base: str = API_BASE) -> dict:
"""Fetch current state from the state-hub API.
Args:
domain: If given, calls /state/domain/{domain} (cheaper, scoped).
If None, calls /state/summary (full cross-domain view).
api_base: Base URL for the state-hub API.
Returns:
State dict, or {} on any error (graceful degradation — local-first V2).
"""
path = f"/state/domain/{domain}" if domain else "/state/summary"
url = api_base.rstrip("/") + path
try:
resp = httpx.get(url, timeout=10.0)
resp.raise_for_status()
return resp.json()
except Exception:
return {}
def load_constitution() -> str:
"""Load the custodian constitution text for use in the system prompt.
Returns the full markdown text, or a minimal fallback if the file is missing.
"""
try:
return CONSTITUTION_PATH.read_text(encoding="utf-8")
except FileNotFoundError:
return _CONSTITUTION_FALLBACK
def build_context(state: dict, constitution: str) -> str:
"""Build the full LLM prompt from current state and constitution.
The prompt:
1. Frames the agent's role via the constitution
2. Summarises the current project state (counts, blockers, open decisions)
3. Instructs the LLM to return a structured JSON action plan
Args:
state: State dict from fetch_state().
constitution: Constitution text from load_constitution().
Returns:
Complete prompt string to pass to the LLM.
"""
totals = state.get("totals", {})
tasks = totals.get("tasks", {})
workstreams = totals.get("workstreams", {})
decisions = totals.get("decisions", {})
blocked_tasks = state.get("blocked_tasks", [])
blocking_decisions = state.get("blocking_decisions", [])
open_workstreams = state.get("open_workstreams", [])
# --- State summary section ---
state_lines = [
"## Current Project State",
"",
f"Tasks: todo={tasks.get('todo', 0)} in_progress={tasks.get('in_progress', 0)} "
f"blocked={tasks.get('blocked', 0)} done={tasks.get('done', 0)}",
f"Workstreams: active={workstreams.get('active', 0)} "
f"completed={workstreams.get('completed', 0)}",
f"Decisions: open={decisions.get('open', 0)} "
f"resolved={decisions.get('resolved', 0)}",
]
if blocking_decisions:
state_lines += ["", "### Blocking Decisions (require resolution before work can proceed)"]
for d in blocking_decisions:
state_lines.append(f"- [{d.get('id', '?')}] {d.get('title', 'untitled')}")
if blocked_tasks:
state_lines += ["", "### Blocked Tasks"]
for t in blocked_tasks:
reason = t.get("blocking_reason", "no reason given")
state_lines.append(f"- [{t.get('id', '?')}] {t.get('title', 'untitled')}: {reason}")
if open_workstreams:
state_lines += ["", "### Open Workstreams"]
for ws in open_workstreams[:10]: # cap at 10 to avoid token overflow
todo = ws.get("tasks_todo", 0)
done = ws.get("tasks_done", 0)
state_lines.append(
f"- [{ws.get('slug', '?')}] {ws.get('title', 'untitled')} "
f"({done} done / {todo} todo)"
)
state_section = "\n".join(state_lines)
# --- Action format instruction ---
action_instruction = """
## Your Task
Review the state above and produce a concise action plan. You MUST include a
fenced ```json block with the following structure (use null for optional fields):
```json
{
"observations": ["<key insight 1>", "<key insight 2>"],
"progress_events": [
{
"summary": "<what happened or was observed>",
"workstream_id": "<uuid or null>",
"event_type": "note"
}
],
"tasks_to_update": [
{"task_id": "<uuid>", "status": "<done|in_progress|blocked|todo>"}
],
"tasks_to_flag": [
{"task_id": "<uuid>", "note": "<why human attention is needed>"}
]
}
```
Constraints (from constitution):
- Only add progress_events, update task statuses, or flag tasks for human review.
- Do NOT propose financial, legal, or external publication actions.
- If you are uncertain, add a flag_for_human entry rather than acting.
- Keep observations factual and brief.
"""
return f"""# Custodian Agent — OODA Session
## Constitution (Operating Constraints)
{constitution}
---
{state_section}
---
{action_instruction}"""

19
runtime/pyproject.toml Normal file
View File

@@ -0,0 +1,19 @@
[project]
name = "custodian-agent-runtime"
version = "0.1.0"
description = "Custodian Agent Runtime — OODA loop bootstrap"
requires-python = ">=3.11"
dependencies = [
"httpx>=0.28.0",
"llm-connect",
]
[tool.uv.sources]
llm-connect = { path = "/home/worsch/llm-connect" }
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["."]
[dependency-groups]
dev = ["pytest>=8.0"]

View File

View File

@@ -0,0 +1,179 @@
"""Tests for the bounded action executor (actions.py).
All API calls are mocked — no live state-hub required.
"""
from __future__ import annotations
from unittest.mock import MagicMock, call, patch
import pytest
from actions import (
SANCTIONED_ACTIONS,
execute,
parse_plan,
)
# ---------------------------------------------------------------------------
# parse_plan
# ---------------------------------------------------------------------------
class TestParsePlan:
def test_extracts_json_from_markdown(self):
response = (
"Here is my analysis.\n\n"
"```json\n"
'{"observations": ["all good"], "progress_events": [], '
'"tasks_to_update": [], "tasks_to_flag": []}\n'
"```\n\n"
"Let me know if you need anything else."
)
plan = parse_plan(response)
assert plan["observations"] == ["all good"]
assert plan["progress_events"] == []
def test_returns_empty_plan_on_no_json_block(self):
plan = parse_plan("Just some text with no JSON block.")
assert plan == {"progress_events": [], "tasks_to_update": [], "tasks_to_flag": []}
def test_returns_empty_plan_on_malformed_json(self):
response = "```json\n{broken json\n```"
plan = parse_plan(response)
assert plan == {"progress_events": [], "tasks_to_update": [], "tasks_to_flag": []}
def test_handles_multiple_json_blocks_uses_first(self):
response = (
"```json\n{\"progress_events\": [{\"summary\": \"first\"}], "
"\"tasks_to_update\": [], \"tasks_to_flag\": []}\n```\n"
"```json\n{\"progress_events\": [{\"summary\": \"second\"}], "
"\"tasks_to_update\": [], \"tasks_to_flag\": []}\n```"
)
plan = parse_plan(response)
assert plan["progress_events"][0]["summary"] == "first"
def test_missing_keys_are_defaulted(self):
response = '```json\n{"observations": ["noted"]}\n```'
plan = parse_plan(response)
assert "progress_events" in plan
assert "tasks_to_update" in plan
assert "tasks_to_flag" in plan
# ---------------------------------------------------------------------------
# execute — dry run
# ---------------------------------------------------------------------------
class TestExecuteDryRun:
def test_dry_run_makes_no_api_calls(self):
plan = {
"progress_events": [{"summary": "test", "workstream_id": None, "event_type": "note"}],
"tasks_to_update": [{"task_id": "t1", "status": "done"}],
"tasks_to_flag": [{"task_id": "t2", "note": "needs review"}],
}
with patch("httpx.post") as mock_post, patch("httpx.patch") as mock_patch:
results = execute(plan, dry_run=True)
mock_post.assert_not_called()
mock_patch.assert_not_called()
def test_dry_run_returns_descriptions(self):
plan = {
"progress_events": [{"summary": "test note", "event_type": "note"}],
"tasks_to_update": [],
"tasks_to_flag": [],
}
results = execute(plan, dry_run=True)
assert len(results) == 1
assert "test note" in results[0] or "dry-run" in results[0].lower()
# ---------------------------------------------------------------------------
# execute — live (mocked API)
# ---------------------------------------------------------------------------
class TestExecuteLive:
def _ok_response(self, data: dict = None):
resp = MagicMock()
resp.status_code = 201
resp.json.return_value = data or {"id": "new-id"}
resp.raise_for_status = MagicMock()
return resp
def test_posts_progress_event(self):
plan = {
"progress_events": [
{"summary": "session note", "workstream_id": "ws-1", "event_type": "note"}
],
"tasks_to_update": [],
"tasks_to_flag": [],
}
with patch("httpx.post") as mock_post:
mock_post.return_value = self._ok_response()
results = execute(plan)
mock_post.assert_called_once()
payload = mock_post.call_args[1]["json"]
assert payload["summary"] == "session note"
def test_patches_task_status(self):
plan = {
"progress_events": [],
"tasks_to_update": [{"task_id": "abc-123", "status": "done"}],
"tasks_to_flag": [],
}
with patch("httpx.patch") as mock_patch:
mock_patch.return_value = self._ok_response()
results = execute(plan)
mock_patch.assert_called_once()
call_url = mock_patch.call_args[0][0]
assert "abc-123" in call_url
def test_flags_task_for_human(self):
plan = {
"progress_events": [],
"tasks_to_update": [],
"tasks_to_flag": [{"task_id": "t99", "note": "needs human review"}],
}
with patch("httpx.patch") as mock_patch:
mock_patch.return_value = self._ok_response()
results = execute(plan)
mock_patch.assert_called_once()
payload = mock_patch.call_args[1]["json"]
assert payload.get("needs_human") is True
assert "needs human review" in payload.get("intervention_note", "")
def test_gracefully_handles_api_error(self):
plan = {
"progress_events": [{"summary": "test", "event_type": "note"}],
"tasks_to_update": [],
"tasks_to_flag": [],
}
with patch("httpx.post") as mock_post:
mock_post.side_effect = Exception("Connection refused")
# Should not raise — errors are logged in results
results = execute(plan)
assert any("error" in r.lower() or "failed" in r.lower() for r in results)
def test_empty_plan_returns_empty_results(self):
plan = {"progress_events": [], "tasks_to_update": [], "tasks_to_flag": []}
results = execute(plan)
assert results == []
# ---------------------------------------------------------------------------
# sanctioned_actions constant
# ---------------------------------------------------------------------------
class TestSanctionedActions:
def test_only_three_sanctioned_actions(self):
assert len(SANCTIONED_ACTIONS) == 3
def test_contains_expected_actions(self):
assert "add_progress_event" in SANCTIONED_ACTIONS
assert "update_task_status" in SANCTIONED_ACTIONS
assert "flag_for_human" in SANCTIONED_ACTIONS
def test_no_destructive_actions_sanctioned(self):
for action in SANCTIONED_ACTIONS:
assert "delete" not in action.lower()
assert "destroy" not in action.lower()
assert "drop" not in action.lower()

View File

@@ -0,0 +1,164 @@
"""Tests for the observation and context-building layer (context.py).
All tests are offline — httpx is mocked so no live state-hub required.
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from context import (
build_context,
fetch_state,
load_constitution,
CONSTITUTION_PATH,
API_BASE,
)
# ---------------------------------------------------------------------------
# fetch_state
# ---------------------------------------------------------------------------
class TestFetchState:
def _mock_response(self, data: dict, status: int = 200):
resp = MagicMock()
resp.status_code = status
resp.json.return_value = data
resp.raise_for_status = MagicMock()
return resp
def test_fetch_state_calls_summary_endpoint(self):
state_data = {"totals": {"tasks": {"todo": 5}}, "topics": []}
with patch("httpx.get") as mock_get:
mock_get.return_value = self._mock_response(state_data)
result = fetch_state(domain=None)
mock_get.assert_called_once()
call_url = mock_get.call_args[0][0]
assert "/state/summary" in call_url
def test_fetch_state_with_domain_calls_domain_endpoint(self):
domain_data = {"domain": "custodian", "workstreams": []}
with patch("httpx.get") as mock_get:
mock_get.return_value = self._mock_response(domain_data)
result = fetch_state(domain="custodian")
call_url = mock_get.call_args[0][0]
assert "custodian" in call_url
def test_fetch_state_returns_dict(self):
state_data = {"totals": {}, "topics": []}
with patch("httpx.get") as mock_get:
mock_get.return_value = self._mock_response(state_data)
result = fetch_state()
assert isinstance(result, dict)
def test_fetch_state_handles_connection_error(self):
with patch("httpx.get") as mock_get:
mock_get.side_effect = Exception("Connection refused")
result = fetch_state()
# Graceful degradation: returns empty dict, does not raise
assert result == {}
def test_fetch_state_handles_non_200(self):
resp = MagicMock()
resp.raise_for_status.side_effect = Exception("503 Service Unavailable")
with patch("httpx.get") as mock_get:
mock_get.return_value = resp
result = fetch_state()
assert result == {}
def test_fetch_state_custom_api_base(self):
with patch("httpx.get") as mock_get:
mock_get.return_value = self._mock_response({})
fetch_state(api_base="http://localhost:9999")
call_url = mock_get.call_args[0][0]
assert "localhost:9999" in call_url
# ---------------------------------------------------------------------------
# load_constitution
# ---------------------------------------------------------------------------
class TestLoadConstitution:
def test_load_constitution_returns_non_empty_string(self):
text = load_constitution()
assert isinstance(text, str)
assert len(text) > 100
def test_load_constitution_contains_key_clauses(self):
text = load_constitution()
assert "Custodian" in text
assert "Forbidden" in text or "forbidden" in text.lower()
def test_constitution_path_exists(self):
assert CONSTITUTION_PATH.exists(), (
f"Constitution not found at {CONSTITUTION_PATH}. "
"The path is hardcoded relative to this file — check context.py."
)
def test_load_constitution_with_missing_file(self, tmp_path, monkeypatch):
"""If constitution is missing, return a minimal fallback, not an exception."""
import context as ctx_module
monkeypatch.setattr(ctx_module, "CONSTITUTION_PATH",
tmp_path / "nonexistent.md")
text = load_constitution()
assert isinstance(text, str)
# Should return fallback, not crash
assert len(text) > 0
# ---------------------------------------------------------------------------
# build_context
# ---------------------------------------------------------------------------
class TestBuildContext:
def _minimal_state(self) -> dict:
return {
"totals": {
"tasks": {"todo": 3, "done": 10},
"workstreams": {"active": 2, "completed": 5},
"decisions": {"open": 0},
},
"blocking_decisions": [],
"blocked_tasks": [],
"open_workstreams": [],
}
def test_build_context_returns_string(self):
ctx = build_context(self._minimal_state(), "## Constitution\nBe safe.")
assert isinstance(ctx, str)
def test_build_context_includes_constitution(self):
ctx = build_context(self._minimal_state(), "## Constitution\nBe safe.")
assert "Be safe" in ctx
def test_build_context_includes_task_counts(self):
ctx = build_context(self._minimal_state(), "")
assert "3" in ctx # todo count
def test_build_context_mentions_blocked_tasks_when_present(self):
state = self._minimal_state()
state["blocked_tasks"] = [
{"id": "t1", "title": "Deploy postgres", "blocking_reason": "no cluster"}
]
ctx = build_context(state, "")
assert "Deploy postgres" in ctx or "blocked" in ctx.lower()
def test_build_context_mentions_blocking_decisions_when_present(self):
state = self._minimal_state()
state["blocking_decisions"] = [
{"id": "d1", "title": "Which DB?", "type": "pending"}
]
ctx = build_context(state, "")
assert "Which DB?" in ctx or "decision" in ctx.lower()
def test_build_context_with_empty_state_does_not_crash(self):
ctx = build_context({}, "some constitution")
assert isinstance(ctx, str)
def test_build_context_includes_json_response_instruction(self):
"""The prompt must instruct the LLM to return a JSON block."""
ctx = build_context(self._minimal_state(), "")
assert "```json" in ctx or "JSON" in ctx

186
runtime/uv.lock generated Normal file
View File

@@ -0,0 +1,186 @@
version = 1
requires-python = ">=3.11"
[[package]]
name = "anyio"
version = "4.12.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "idna" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/96/f0/5eb65b2bb0d09ac6776f2eb54adee6abe8228ea05b20a5ad0e4945de8aac/anyio-4.12.1.tar.gz", hash = "sha256:41cfcc3a4c85d3f05c932da7c26d0201ac36f72abd4435ba90d0464a3ffed703", size = 228685 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/38/0e/27be9fdef66e72d64c0cdc3cc2823101b80585f8119b5c112c2e8f5f7dab/anyio-4.12.1-py3-none-any.whl", hash = "sha256:d405828884fc140aa80a3c667b8beed277f1dfedec42ba031bd6ac3db606ab6c", size = 113592 },
]
[[package]]
name = "certifi"
version = "2026.2.25"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/af/2d/7bf41579a8986e348fa033a31cdd0e4121114f6bce2457e8876010b092dd/certifi-2026.2.25.tar.gz", hash = "sha256:e887ab5cee78ea814d3472169153c2d12cd43b14bd03329a39a9c6e2e80bfba7", size = 155029 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9a/3c/c17fb3ca2d9c3acff52e30b309f538586f9f5b9c9cf454f3845fc9af4881/certifi-2026.2.25-py3-none-any.whl", hash = "sha256:027692e4402ad994f1c42e52a4997a9763c646b73e4096e4d5d6db8af1d6f0fa", size = 153684 },
]
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335 },
]
[[package]]
name = "custodian-agent-runtime"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "httpx" },
{ name = "llm-connect" },
]
[package.dev-dependencies]
dev = [
{ name = "pytest" },
]
[package.metadata]
requires-dist = [
{ name = "httpx", specifier = ">=0.28.0" },
{ name = "llm-connect", directory = "../../llm-connect" },
]
[package.metadata.requires-dev]
dev = [{ name = "pytest", specifier = ">=8.0" }]
[[package]]
name = "h11"
version = "0.16.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515 },
]
[[package]]
name = "httpcore"
version = "1.0.9"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784 },
]
[[package]]
name = "httpx"
version = "0.28.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "certifi" },
{ name = "httpcore" },
{ name = "idna" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517 },
]
[[package]]
name = "idna"
version = "3.11"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/6f/6d/0703ccc57f3a7233505399edb88de3cbd678da106337b9fcde432b65ed60/idna-3.11.tar.gz", hash = "sha256:795dafcc9c04ed0c1fb032c2aa73654d8e8c5023a7df64a53f39190ada629902", size = 194582 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0e/61/66938bbb5fc52dbdf84594873d5b51fb1f7c7794e9c0f5bd885f30bc507b/idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea", size = 71008 },
]
[[package]]
name = "iniconfig"
version = "2.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484 },
]
[[package]]
name = "llm-connect"
version = "0.1.0"
source = { directory = "../../llm-connect" }
dependencies = [
{ name = "toml" },
]
[package.metadata]
requires-dist = [
{ name = "pytest", marker = "extra == 'dev'", specifier = ">=7.0" },
{ name = "toml" },
]
[[package]]
name = "packaging"
version = "26.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/65/ee/299d360cdc32edc7d2cf530f3accf79c4fca01e96ffc950d8a52213bd8e4/packaging-26.0.tar.gz", hash = "sha256:00243ae351a257117b6a241061796684b084ed1c516a08c48a3f7e147a9d80b4", size = 143416 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/b9/c538f279a4e237a006a2c98387d081e9eb060d203d8ed34467cc0f0b9b53/packaging-26.0-py3-none-any.whl", hash = "sha256:b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529", size = 74366 },
]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538 },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217 },
]
[[package]]
name = "pytest"
version = "9.0.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801 },
]
[[package]]
name = "toml"
version = "0.10.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/be/ba/1f744cdc819428fc6b5084ec34d9b30660f6f9daaf70eead706e3203ec3c/toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f", size = 22253 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/44/6f/7120676b6d73228c96e17f1f794d8ab046fc910d781c8d151120c3f1569e/toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", size = 16588 },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614 },
]

View File

@@ -3,12 +3,12 @@ id: CUST-WP-0001
type: workplan
title: "Custodian Agent Runtime — Bootstrap"
domain: custodian
status: active
status: done
owner: custodian
topic_slug: custodian
state_hub_workstream_id: a2d9919d-62ec-49e7-9533-ba650757e70a
created: "2026-02-25"
updated: "2026-03-11"
updated: "2026-03-12"
---
# Custodian Agent Runtime — Bootstrap
@@ -59,7 +59,7 @@ editable dependency; markitect llm-check smoke test green.
```task
id: CUST-WP-0001-T02
state_hub_task_id: 9a9297cd-bd3c-409c-8384-6f06cfc6faa2
status: todo
status: done
priority: medium
```