From 55e36bdf2de5ad1ae9cfb00afcfe7ac74ed398b2 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sun, 7 Jun 2026 20:11:07 +0200 Subject: [PATCH] feat: add State Hub bulk status skill --- api/routers/tasks.py | 88 ++++++++++++- api/schemas/task.py | 24 ++++ mcp_server/TOOLS.md | 2 + mcp_server/server.py | 28 ++++ skills/state-hub/SKILL.md | 65 ++++++++++ .../state-hub/references/tool-signatures.md | 97 ++++++++++++++ tests/test_mcp_write_tools.py | 44 +++++++ tests/test_task_bulk_status_sync.py | 121 ++++++++++++++++++ ...ATE-WP-0058-agent-skill-hub-interaction.md | 32 ++++- 9 files changed, 496 insertions(+), 5 deletions(-) create mode 100644 skills/state-hub/SKILL.md create mode 100644 skills/state-hub/references/tool-signatures.md create mode 100644 tests/test_task_bulk_status_sync.py diff --git a/api/routers/tasks.py b/api/routers/tasks.py index 45a5d67..20526a3 100644 --- a/api/routers/tasks.py +++ b/api/routers/tasks.py @@ -6,10 +6,18 @@ from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from api.database import get_session +from api.models.progress_event import ProgressEvent from api.models.task import Task, TaskStatus from api.models.token_event import TokenEvent from api.models.workstream import Workstream -from api.schemas.task import TaskCountRead, TaskCreate, TaskRead, TaskUpdate +from api.schemas.task import ( + TaskCountRead, + TaskCreate, + TaskRead, + TaskStatusBulkSync, + TaskStatusBulkSyncRead, + TaskUpdate, +) from api.services.lifecycle import status_value, transition_task_status from api.task_status import normalize_task_status @@ -88,6 +96,84 @@ async def create_task( return task +@router.post("/bulk-status-sync", response_model=TaskStatusBulkSyncRead) +async def bulk_status_sync( + body: TaskStatusBulkSync, + session: AsyncSession = Depends(get_session), +) -> TaskStatusBulkSyncRead: + seen: set[uuid.UUID] = set() + duplicate_ids: list[str] = [] + tasks_by_id: dict[uuid.UUID, Task] = {} + missing_ids: list[str] = [] + + for update in body.updates: + if update.task_id in seen: + duplicate_ids.append(str(update.task_id)) + continue + seen.add(update.task_id) + task = await session.get(Task, update.task_id) + if task is None: + missing_ids.append(str(update.task_id)) + else: + tasks_by_id[update.task_id] = task + + if duplicate_ids: + raise HTTPException( + status_code=400, + detail={"message": "duplicate task_id values are not allowed", "task_ids": duplicate_ids}, + ) + if missing_ids: + raise HTTPException( + status_code=404, + detail={"message": "one or more tasks were not found", "task_ids": missing_ids}, + ) + + updated: list[Task] = [] + events: list[ProgressEvent] = [] + author = body.author or "custodian" + for update in body.updates: + task = tasks_by_id[update.task_id] + previous_status = status_value(task.status) + target_status = status_value(update.status) + if update.blocking_reason is not None: + task.blocking_reason = update.blocking_reason + ws = await session.get(Workstream, task.workstream_id) + transition_task_status( + task, + update.status, + parent_workstream=ws, + previous_task_status=previous_status, + ) + event = ProgressEvent( + task_id=task.id, + workstream_id=task.workstream_id, + event_type="task_status_changed", + summary=f"Task status -> {target_status}: {task.title}", + author=author, + session_id=body.session_id, + detail={ + "bulk_status_sync": True, + "previous_status": previous_status, + "status": target_status, + "blocking_reason": update.blocking_reason, + }, + ) + session.add(event) + updated.append(task) + events.append(event) + + await session.commit() + for task in updated: + await session.refresh(task) + for event in events: + await session.refresh(event) + + return TaskStatusBulkSyncRead( + updated=updated, + progress_event_ids=[event.id for event in events], + ) + + @router.get("/{task_id}", response_model=TaskRead) async def get_task( task_id: uuid.UUID, diff --git a/api/schemas/task.py b/api/schemas/task.py index 1b2ff9f..3603788 100644 --- a/api/schemas/task.py +++ b/api/schemas/task.py @@ -77,6 +77,25 @@ class TaskUpdate(TaskStatusMixin): return self +class TaskStatusBulkUpdate(TaskStatusMixin): + task_id: uuid.UUID + status: TaskStatus + blocking_reason: str | None = None + + +class TaskStatusBulkSync(BaseModel): + updates: list[TaskStatusBulkUpdate] + author: str | None = "custodian" + session_id: str | None = None + + @field_validator("updates") + @classmethod + def updates_required(cls, value: list[TaskStatusBulkUpdate]): + if not value: + raise ValueError("at least one task status update is required") + return value + + class TaskRead(TaskStatusMixin): model_config = ConfigDict(from_attributes=True) id: uuid.UUID @@ -99,3 +118,8 @@ class TaskCountRead(TaskStatusMixin): workstream_id: uuid.UUID status: TaskStatus count: int + + +class TaskStatusBulkSyncRead(BaseModel): + updated: list[TaskRead] + progress_event_ids: list[uuid.UUID] diff --git a/mcp_server/TOOLS.md b/mcp_server/TOOLS.md index cc970cc..4ecda3c 100644 --- a/mcp_server/TOOLS.md +++ b/mcp_server/TOOLS.md @@ -31,6 +31,7 @@ endpoint they wrap: | `create_workstream(...)` | `POST /workstreams/` | | `create_task(...)` | `POST /tasks/` | | `update_task_status(...)` | `PATCH /tasks/{task_id}` | +| `bulk_update_task_statuses(...)` | `POST /tasks/bulk-status-sync` | | `record_decision(...)` | `POST /decisions/` | | `add_progress_event(...)` | `POST /progress/` | @@ -93,6 +94,7 @@ entity while recording the missing progress event. | `create_workstream(topic_id, title, ...)` | `slug?`; `owner?`; `description?`; `due_date?` | Creates workstream under a topic. Use `get_state_summary()` to find topic IDs. | | `create_task(workstream_id, title, ...)` | `priority`: low/medium/high/critical; `assignee?`; `due_date?` | Creates task under a workstream. | | `update_task_status(task_id, status, ...)` | `status`: wait/todo/progress/done/cancel; `blocking_reason?` describes wait conditions | Legacy aliases `blocked`, `in_progress`, `cancelled`, and `canceled` are accepted during migration. | +| `bulk_update_task_statuses(updates, author?, session_id?)` | `updates`: list of `{task_id, status, blocking_reason?}` | Updates many task statuses in one REST call and emits one `task_status_changed` progress event per task. Prefer this at session checkpoints instead of many single-task calls. | | `update_workstream_status(workstream_id, status)` | `status`: proposed/ready/active/blocked/backlog/finished/archived | Thin shortcut — use `update_workstream` for full field control. | | `update_workstream(workstream_id, ...)` | `title?`; `description?`; `owner?`; `due_date?`; `repo_goal_id?`; `status?` | Patch any subset of workstream fields. Pass empty string for `repo_goal_id` to clear the link. | diff --git a/mcp_server/server.py b/mcp_server/server.py index edfbae2..95d7d88 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -633,6 +633,34 @@ def update_task_status( return _json_result(task) +@mcp.tool() +def bulk_update_task_statuses( + updates: list[dict[str, Any]], + author: str | None = "custodian", + session_id: str | None = None, +) -> str: + """Update many task statuses in one call and emit one progress_event per task. + + Args: + updates: list of {task_id, status, blocking_reason?}; status values are + wait | todo | progress | done | cancel + author: optional progress event author (defaults to custodian) + session_id: optional agent session identifier for progress events + """ + result = _post("/tasks/bulk-status-sync", { + "updates": updates, + "author": author, + "session_id": session_id, + }) + if error := _response_error( + "bulk_update_task_statuses", + result, + ("updated", "progress_event_ids"), + ): + return _json_result(error) + return _json_result(result) + + @mcp.tool() def flag_for_human(task_id: str, note: str) -> str: """Flag a task as requiring human intervention. diff --git a/skills/state-hub/SKILL.md b/skills/state-hub/SKILL.md new file mode 100644 index 0000000..a6b3aa7 --- /dev/null +++ b/skills/state-hub/SKILL.md @@ -0,0 +1,65 @@ +--- +name: state-hub +description: Use when coordinating with Custodian State Hub: orienting with domain summaries, checking agent inbox messages, updating workplan-backed task status, recording decisions/progress, or batching task status sync through MCP/REST without re-discovering tool schemas. +--- + +# State Hub Coordination + +Use this skill at the start and close of State Hub aware coding sessions. The +hub is a read/cache/index model over repo-owned workplan files; do not invent +work structure in the hub when a workplan file is the canon. + +## Session Flow + +1. Orient with `get_domain_summary(domain_slug)` when working inside one domain + repo. Use `get_state_summary()` only for cross-domain/custodian-wide work. +2. Check inbox with `get_messages(to_agent=, unread_only=true)`. + Mark acted-on messages with `mark_message_read(message_id)`. +3. During work, edit the workplan file first. Mirror task/workstream status to + the hub at checkpoints. +4. Prefer `bulk_update_task_statuses(...)` for checkpoint syncs with multiple + task updates. Use `update_task_status(...)` for one-off changes. +5. Close with one concise `add_progress_event(...)`, then run the repo's + `make fix-consistency REPO=` command when workplan files changed. + +## High-Frequency MCP Signatures + +```text +get_domain_summary(domain_slug: str) -> str +get_messages(to_agent?: str, from_agent?: str, unread_only: bool = false, limit: int = 20) -> str +send_message(from_agent: str, to_agent: str, subject: str, body: str, thread_id?: str) -> str + +create_workstream(topic_id: str, title: str, slug?: str, description?: str, owner?: str, due_date?: str, repo_id?: str, planning_priority?: str, planning_order?: int) -> str +create_task(workstream_id: str, title: str, priority: str = "medium", description?: str, assignee?: str, due_date?: str) -> str +update_task_status(task_id: str, status: str, blocking_reason?: str, tokens_in?: int, tokens_out?: int, workplan_tokens_in?: int, workplan_tokens_out?: int, note?: str, model?: str, agent?: str, session_id?: str) -> str +bulk_update_task_statuses(updates: list[dict], author?: str = "custodian", session_id?: str) -> str +add_progress_event(summary: str, event_type: str = "note", topic_id?: str, workstream_id?: str, task_id?: str, detail?: dict | str) -> str +record_decision(title: str, decision_type: str = "pending", topic_id?: str, workstream_id?: str, description?: str, rationale?: str, decided_by?: str, deadline?: str) -> str +``` + +`bulk_update_task_statuses` updates `N` task statuses in one call: + +```json +{ + "updates": [ + {"task_id": "", "status": "progress"}, + {"task_id": "", "status": "wait", "blocking_reason": "waiting for operator"} + ], + "author": "codex", + "session_id": "" +} +``` + +Each bulk item emits a `task_status_changed` progress event. Keep separate +progress notes coarse: milestones, blockers, handoffs, or final summaries. + +## Boundaries + +- Canon lives in files: workplans, `INTENT.md`, repo docs, and commits. +- The hub indexes and broadcasts state; it is not a substitute workplan author. +- For new multi-step work, create or update the workplan file, then sync. +- If MCP returns an error payload, use the matching REST endpoint as fallback + and record what happened once the write succeeds. + +For REST paths, response shapes, and fallback examples, read +`references/tool-signatures.md`. diff --git a/skills/state-hub/references/tool-signatures.md b/skills/state-hub/references/tool-signatures.md new file mode 100644 index 0000000..18decfa --- /dev/null +++ b/skills/state-hub/references/tool-signatures.md @@ -0,0 +1,97 @@ +# State Hub Tool Signatures + +Load this reference when a session needs exact REST fallback paths or batched +write payloads. + +## Orientation + +```text +MCP: get_domain_summary(domain_slug) +REST: GET /state/summary then filter by topic/domain when MCP is unavailable +``` + +Use `get_domain_summary("custodian")` inside State Hub work. It returns the +domain topic, active workstreams, blocking decisions, recent progress, repos, +and compact capability hints. + +## Agent Messages + +```text +MCP: get_messages(to_agent?, from_agent?, unread_only?, limit?) +REST: GET /messages/?to_agent=&unread_only=true + +MCP: send_message(from_agent, to_agent, subject, body, thread_id?) +REST: POST /messages/ + +MCP: mark_message_read(message_id) +REST: PATCH /messages/{message_id}/read +``` + +Use repo slugs as agent names. Use `broadcast` only for genuinely shared +coordination. + +## Workstreams and Tasks + +```text +MCP: create_workstream(topic_id, title, slug?, description?, owner?, due_date?, repo_id?, planning_priority?, planning_order?) +REST: POST /workstreams/ + +MCP: create_task(workstream_id, title, priority="medium", description?, assignee?, due_date?) +REST: POST /tasks/ + +MCP: update_task_status(task_id, status, blocking_reason?, tokens_in?, tokens_out?, workplan_tokens_in?, workplan_tokens_out?, note?, model?, agent?, session_id?) +REST: PATCH /tasks/{task_id} +``` + +Canonical task statuses are `wait`, `todo`, `progress`, `done`, and `cancel`. +Legacy aliases are accepted during migration, but do not emit new workplan files +with old vocabulary. + +## Bulk Task Status Sync + +```text +MCP: bulk_update_task_statuses(updates, author?, session_id?) +REST: POST /tasks/bulk-status-sync +``` + +Payload: + +```json +{ + "updates": [ + {"task_id": "uuid-1", "status": "progress"}, + {"task_id": "uuid-2", "status": "done"}, + {"task_id": "uuid-3", "status": "wait", "blocking_reason": "needs approval"} + ], + "author": "codex", + "session_id": "optional-session-id" +} +``` + +Response: + +```json +{ + "updated": [ + {"id": "uuid-1", "status": "progress", "...": "..."} + ], + "progress_event_ids": ["event-uuid-1"] +} +``` + +The endpoint rejects duplicate task ids with `400` and missing task ids with +`404` before changing any task. Each successful item emits one +`task_status_changed` progress event with `detail.bulk_status_sync = true`. + +## Progress and Decisions + +```text +MCP: add_progress_event(summary, event_type="note", topic_id?, workstream_id?, task_id?, detail?) +REST: POST /progress/ + +MCP: record_decision(title, decision_type="pending", topic_id?, workstream_id?, description?, rationale?, decided_by?, deadline?) +REST: POST /decisions/ +``` + +Prefer one progress event per checkpoint. A useful close event says what changed, +which tests ran, whether consistency sync passed, and what remains. diff --git a/tests/test_mcp_write_tools.py b/tests/test_mcp_write_tools.py index 0eda9aa..67c7813 100644 --- a/tests/test_mcp_write_tools.py +++ b/tests/test_mcp_write_tools.py @@ -180,6 +180,50 @@ class TestMCPWriteTools: ) ] + async def test_bulk_update_task_statuses_returns_rest_shape(self, monkeypatch): + calls: list[tuple[str, dict[str, Any]]] = [] + + def fake_post(path: str, body: dict[str, Any]) -> dict[str, Any]: + calls.append((path, body)) + assert path == "/tasks/bulk-status-sync" + return { + "updated": [ + {"id": "task-1", "title": "First", "status": "done"}, + {"id": "task-2", "title": "Second", "status": "wait"}, + ], + "progress_event_ids": ["event-1", "event-2"], + } + + monkeypatch.setattr(server, "_post", fake_post) + + body = await _call_tool( + "bulk_update_task_statuses", + { + "author": "codex", + "session_id": "session-1", + "updates": [ + {"task_id": "task-1", "status": "done"}, + {"task_id": "task-2", "status": "wait", "blocking_reason": "needs input"}, + ], + }, + ) + + assert body["progress_event_ids"] == ["event-1", "event-2"] + assert [task["status"] for task in body["updated"]] == ["done", "wait"] + assert calls == [ + ( + "/tasks/bulk-status-sync", + { + "updates": [ + {"task_id": "task-1", "status": "done"}, + {"task_id": "task-2", "status": "wait", "blocking_reason": "needs input"}, + ], + "author": "codex", + "session_id": "session-1", + }, + ) + ] + async def test_record_decision_returns_rest_shape_and_emits_progress(self, monkeypatch): calls: list[tuple[str, dict[str, Any]]] = [] diff --git a/tests/test_task_bulk_status_sync.py b/tests/test_task_bulk_status_sync.py new file mode 100644 index 0000000..50415b9 --- /dev/null +++ b/tests/test_task_bulk_status_sync.py @@ -0,0 +1,121 @@ +from __future__ import annotations + + +async def _create_domain(client, slug: str = "bulk-domain"): + r = await client.post("/domains/", json={"slug": slug, "name": "Bulk Domain"}) + assert r.status_code == 201 + return r.json() + + +async def _create_topic(client, domain_slug: str = "bulk-domain"): + r = await client.post( + "/topics/", + json={"slug": "bulk-topic", "title": "Bulk Topic", "domain": domain_slug}, + ) + assert r.status_code == 201 + return r.json() + + +async def _create_workstream(client, topic_id: str): + r = await client.post( + "/workstreams/", + json={"topic_id": topic_id, "slug": "bulk-ws", "title": "Bulk Workstream"}, + ) + assert r.status_code == 201 + return r.json() + + +async def _create_task(client, workstream_id: str, title: str): + r = await client.post( + "/tasks/", + json={"workstream_id": workstream_id, "title": title}, + ) + assert r.status_code == 201 + return r.json() + + +async def _seed_two_tasks(client): + await _create_domain(client) + topic = await _create_topic(client) + ws = await _create_workstream(client, topic["id"]) + first = await _create_task(client, ws["id"], "First bulk task") + second = await _create_task(client, ws["id"], "Second bulk task") + return ws, first, second + + +class TestTaskBulkStatusSync: + async def test_updates_many_tasks_and_emits_progress_events(self, client): + ws, first, second = await _seed_two_tasks(client) + + r = await client.post( + "/tasks/bulk-status-sync", + json={ + "author": "codex", + "session_id": "session-1", + "updates": [ + {"task_id": first["id"], "status": "progress"}, + {"task_id": second["id"], "status": "wait", "blocking_reason": "needs operator"}, + ], + }, + ) + + assert r.status_code == 200 + body = r.json() + assert [task["id"] for task in body["updated"]] == [first["id"], second["id"]] + assert [task["status"] for task in body["updated"]] == ["progress", "wait"] + assert body["updated"][1]["blocking_reason"] == "needs operator" + assert len(body["progress_event_ids"]) == 2 + + progress = await client.get("/progress/", params={"workstream_id": ws["id"]}) + assert progress.status_code == 200 + events = progress.json() + assert [event["id"] for event in events] == body["progress_event_ids"] + assert [event["event_type"] for event in events] == ["task_status_changed", "task_status_changed"] + assert events[0]["author"] == "codex" + assert events[0]["session_id"] == "session-1" + assert events[0]["detail"]["bulk_status_sync"] is True + assert events[0]["detail"]["previous_status"] == "todo" + assert events[0]["detail"]["status"] == "progress" + + async def test_duplicate_task_ids_are_rejected_without_updates(self, client): + _, first, _ = await _seed_two_tasks(client) + + r = await client.post( + "/tasks/bulk-status-sync", + json={ + "updates": [ + {"task_id": first["id"], "status": "progress"}, + {"task_id": first["id"], "status": "done"}, + ], + }, + ) + + assert r.status_code == 400 + assert r.json()["detail"]["task_ids"] == [first["id"]] + + task = await client.get(f"/tasks/{first['id']}") + assert task.status_code == 200 + assert task.json()["status"] == "todo" + + async def test_missing_task_ids_are_rejected_without_updates(self, client): + import uuid + + _, first, _ = await _seed_two_tasks(client) + missing_id = str(uuid.uuid4()) + + r = await client.post( + "/tasks/bulk-status-sync", + json={ + "updates": [ + {"task_id": first["id"], "status": "progress"}, + {"task_id": missing_id, "status": "done"}, + ], + }, + ) + + assert r.status_code == 404 + assert r.json()["detail"]["task_ids"] == [missing_id] + + task = await client.get(f"/tasks/{first['id']}") + assert task.status_code == 200 + assert task.json()["status"] == "todo" diff --git a/workplans/STATE-WP-0058-agent-skill-hub-interaction.md b/workplans/STATE-WP-0058-agent-skill-hub-interaction.md index f48ad60..bc729e5 100644 --- a/workplans/STATE-WP-0058-agent-skill-hub-interaction.md +++ b/workplans/STATE-WP-0058-agent-skill-hub-interaction.md @@ -4,7 +4,7 @@ type: workplan title: "State Hub Agent Skill — front-load tool schemas + batched writes" domain: custodian repo: state-hub -status: ready +status: finished owner: codex topic_slug: custodian created: "2026-06-07" @@ -37,7 +37,7 @@ of sessions). ```task id: STATE-WP-0058-T01 -status: todo +status: done priority: high state_hub_task_id: "6f211ccc-e505-419d-90cc-25874ba98f39" ``` @@ -54,7 +54,7 @@ model). Directly targets the `ToolSearch`-thrash finding. ```task id: STATE-WP-0058-T02 -status: todo +status: done priority: high state_hub_task_id: "2138505a-ea49-4ccc-9dec-a176badaa7a5" ``` @@ -69,7 +69,7 @@ preserve the automatic `progress_event` semantics on each write. ```task id: STATE-WP-0058-T03 -status: todo +status: done priority: medium state_hub_task_id: "cf8d41f8-7831-4c1b-9dfa-58660174294b" ``` @@ -83,3 +83,27 @@ workplan updates, notify the operator to run from `~/state-hub`: ```bash make fix-consistency REPO=state-hub ``` + +## Verification Notes + +Completed 2026-06-07: + +- Added `skills/state-hub/SKILL.md` plus + `skills/state-hub/references/tool-signatures.md` to front-load the + high-frequency State Hub MCP signatures, session flow, ADR-001 files-first + boundary, REST fallbacks, and batched-write guidance. +- Added `POST /tasks/bulk-status-sync` for checkpoint task-status batching. + The endpoint updates all requested task statuses in one transaction, rejects + duplicate task ids with `400`, rejects missing task ids with `404`, and emits + one `task_status_changed` progress event per successful item. +- Added MCP wrapper `bulk_update_task_statuses(updates, author?, session_id?)` + and documented it in `mcp_server/TOOLS.md`. +- Sent completion/signature handoff to `helix_forge` as message + `7236bd1c-b60d-481a-bc05-3080c4b46f72`, followed by tracked-path + correction `1a322eed-96d6-45d4-9e0a-685a6f66e180`. + +Verification: + +- `.venv/bin/python -m pytest tests/test_task_bulk_status_sync.py tests/test_mcp_write_tools.py -q` -> 12 passed +- `.venv/bin/python -m pytest tests/test_task_bulk_status_sync.py tests/test_mcp_write_tools.py tests/test_mcp_smoke.py -q` -> 25 passed +- `git diff --check` -> clean