Files
the-custodian/runtime/context.py
tegwick 2fdbcb5d7a feat(CUST-WP-0001): implement Custodian Agent Runtime bootstrap
T2 complete: OODA loop skeleton with LLM integration, bounded actions,
and 32 offline unit tests.

Deliverables:
- runtime/agent.py     — CLI entry point (--domain/--all/--dry-run/--llm)
- runtime/context.py   — Observe: fetch_state + build_context
- runtime/actions.py   — Act: parse_plan + execute (3 sanctioned writes)
- runtime/README.md    — usage guide and architecture overview
- runtime/tests/       — 32 tests, fully offline
- runtime/pyproject.toml — standalone package with llm-connect dep
- canon/architecture/adr-002-custodian-agent-runtime-design.md

Key design decisions (ADR-002):
- Lives in runtime/ (not a new repo) — tight canon/state-hub coupling
- ClaudeCodeAdapter by default (local-first, no API key)
- Single-pass synchronous OODA for v0.1 simplicity
- Exactly 3 sanctioned write ops: add_progress_event, update_task_status, flag_for_human
- LLM returns JSON block in markdown for structured+auditable output

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 22:36:24 +01:00

161 lines
5.1 KiB
Python

"""Observation layer — fetches state-hub data and builds LLM context.
Observe step of the OODA loop:
- fetch_state: HTTP GET to state-hub /state/summary or /state/domain/{slug}
- load_constitution: reads the custodian constitution for the system prompt
- build_context: assembles the full prompt sent to the LLM
"""
from __future__ import annotations
import json
from pathlib import Path
import httpx
API_BASE = "http://127.0.0.1:8000"
CONSTITUTION_PATH = Path(__file__).parent.parent / "canon" / "constitution" / "custodian_constitution_v0.1.md"
_CONSTITUTION_FALLBACK = (
"You are the Custodian agent. Act as a bounded co-creator. "
"Do not take irreversible actions. Escalate when uncertain."
)
def fetch_state(domain: str | None = None, api_base: str = API_BASE) -> dict:
"""Fetch current state from the state-hub API.
Args:
domain: If given, calls /state/domain/{domain} (cheaper, scoped).
If None, calls /state/summary (full cross-domain view).
api_base: Base URL for the state-hub API.
Returns:
State dict, or {} on any error (graceful degradation — local-first V2).
"""
path = f"/state/domain/{domain}" if domain else "/state/summary"
url = api_base.rstrip("/") + path
try:
resp = httpx.get(url, timeout=10.0)
resp.raise_for_status()
return resp.json()
except Exception:
return {}
def load_constitution() -> str:
"""Load the custodian constitution text for use in the system prompt.
Returns the full markdown text, or a minimal fallback if the file is missing.
"""
try:
return CONSTITUTION_PATH.read_text(encoding="utf-8")
except FileNotFoundError:
return _CONSTITUTION_FALLBACK
def build_context(state: dict, constitution: str) -> str:
"""Build the full LLM prompt from current state and constitution.
The prompt:
1. Frames the agent's role via the constitution
2. Summarises the current project state (counts, blockers, open decisions)
3. Instructs the LLM to return a structured JSON action plan
Args:
state: State dict from fetch_state().
constitution: Constitution text from load_constitution().
Returns:
Complete prompt string to pass to the LLM.
"""
totals = state.get("totals", {})
tasks = totals.get("tasks", {})
workstreams = totals.get("workstreams", {})
decisions = totals.get("decisions", {})
blocked_tasks = state.get("blocked_tasks", [])
blocking_decisions = state.get("blocking_decisions", [])
open_workstreams = state.get("open_workstreams", [])
# --- State summary section ---
state_lines = [
"## Current Project State",
"",
f"Tasks: todo={tasks.get('todo', 0)} in_progress={tasks.get('in_progress', 0)} "
f"blocked={tasks.get('blocked', 0)} done={tasks.get('done', 0)}",
f"Workstreams: active={workstreams.get('active', 0)} "
f"completed={workstreams.get('completed', 0)}",
f"Decisions: open={decisions.get('open', 0)} "
f"resolved={decisions.get('resolved', 0)}",
]
if blocking_decisions:
state_lines += ["", "### Blocking Decisions (require resolution before work can proceed)"]
for d in blocking_decisions:
state_lines.append(f"- [{d.get('id', '?')}] {d.get('title', 'untitled')}")
if blocked_tasks:
state_lines += ["", "### Blocked Tasks"]
for t in blocked_tasks:
reason = t.get("blocking_reason", "no reason given")
state_lines.append(f"- [{t.get('id', '?')}] {t.get('title', 'untitled')}: {reason}")
if open_workstreams:
state_lines += ["", "### Open Workstreams"]
for ws in open_workstreams[:10]: # cap at 10 to avoid token overflow
todo = ws.get("tasks_todo", 0)
done = ws.get("tasks_done", 0)
state_lines.append(
f"- [{ws.get('slug', '?')}] {ws.get('title', 'untitled')} "
f"({done} done / {todo} todo)"
)
state_section = "\n".join(state_lines)
# --- Action format instruction ---
action_instruction = """
## Your Task
Review the state above and produce a concise action plan. You MUST include a
fenced ```json block with the following structure (use null for optional fields):
```json
{
"observations": ["<key insight 1>", "<key insight 2>"],
"progress_events": [
{
"summary": "<what happened or was observed>",
"workstream_id": "<uuid or null>",
"event_type": "note"
}
],
"tasks_to_update": [
{"task_id": "<uuid>", "status": "<done|in_progress|blocked|todo>"}
],
"tasks_to_flag": [
{"task_id": "<uuid>", "note": "<why human attention is needed>"}
]
}
```
Constraints (from constitution):
- Only add progress_events, update task statuses, or flag tasks for human review.
- Do NOT propose financial, legal, or external publication actions.
- If you are uncertain, add a flag_for_human entry rather than acting.
- Keep observations factual and brief.
"""
return f"""# Custodian Agent — OODA Session
## Constitution (Operating Constraints)
{constitution}
---
{state_section}
---
{action_instruction}"""