Files
state-hub/tests/test_task_bulk_status_sync.py
tegwick 0949d4c0d8 feat(classification-spine): implement STATE-WP-0065 repo-anchored model
Replace the ad-hoc coordination-domain spine with the Repo Classification
Standard: 14 market domains, classification columns on managed_repos, and
workplans anchored by repo_id (topic_id optional).

- Add Alembic migration d8e9f0a1b2c3 with data backfill and workstream→workplan rename
- Add api/classification.py validation and register-from-classification tooling
- Expose workplan-first REST/MCP surface with legacy workstream aliases
- Add C-24 consistency rule and legacy domain frontmatter mapping
- Update dashboard repos page with category/capability/stake filters
- Update orientation docs; mark STATE-WP-0065 finished
2026-06-22 13:52:13 +02:00

123 lines
4.3 KiB
Python

from __future__ import annotations
async def _create_domain(client, slug: str = "bulk-domain"):
r = await client.post("/domains/", json={"slug": slug, "name": "Bulk Domain"})
assert r.status_code == 201
return r.json()
async def _create_topic(client, domain_slug: str = "bulk-domain"):
r = await client.post(
"/topics/",
json={"slug": "bulk-topic", "title": "Bulk Topic", "domain": domain_slug},
)
assert r.status_code == 201
return r.json()
from tests.conftest import create_test_repo, create_test_workplan
async def _create_workstream(client, topic_id: str, domain_slug: str = "bulk-domain"):
repo = await create_test_repo(client, domain_slug=domain_slug, slug="bulk-repo")
return await create_test_workplan(
client, repo_id=repo["id"], topic_id=topic_id, slug="bulk-ws", title="Bulk Workstream",
)
async def _create_task(client, workplan_id: str, title: str):
r = await client.post(
"/tasks/",
json={"workplan_id": workplan_id, "title": title},
)
assert r.status_code == 201
return r.json()
async def _seed_two_tasks(client):
await _create_domain(client)
topic = await _create_topic(client)
ws = await _create_workstream(client, topic["id"])
first = await _create_task(client, ws["id"], "First bulk task")
second = await _create_task(client, ws["id"], "Second bulk task")
return ws, first, second
class TestTaskBulkStatusSync:
async def test_updates_many_tasks_and_emits_progress_events(self, client):
ws, first, second = await _seed_two_tasks(client)
r = await client.post(
"/tasks/bulk-status-sync",
json={
"author": "codex",
"session_id": "session-1",
"updates": [
{"task_id": first["id"], "status": "progress"},
{"task_id": second["id"], "status": "wait", "blocking_reason": "needs operator"},
],
},
)
assert r.status_code == 200
body = r.json()
assert [task["id"] for task in body["updated"]] == [first["id"], second["id"]]
assert [task["status"] for task in body["updated"]] == ["progress", "wait"]
assert body["updated"][1]["blocking_reason"] == "needs operator"
assert len(body["progress_event_ids"]) == 2
progress = await client.get("/progress/", params={"workstream_id": ws["id"]})
assert progress.status_code == 200
events = progress.json()
assert [event["id"] for event in events] == body["progress_event_ids"]
assert [event["event_type"] for event in events] == ["task_status_changed", "task_status_changed"]
assert events[0]["author"] == "codex"
assert events[0]["session_id"] == "session-1"
assert events[0]["detail"]["bulk_status_sync"] is True
assert events[0]["detail"]["previous_status"] == "todo"
assert events[0]["detail"]["status"] == "progress"
async def test_duplicate_task_ids_are_rejected_without_updates(self, client):
_, first, _ = await _seed_two_tasks(client)
r = await client.post(
"/tasks/bulk-status-sync",
json={
"updates": [
{"task_id": first["id"], "status": "progress"},
{"task_id": first["id"], "status": "done"},
],
},
)
assert r.status_code == 400
assert r.json()["detail"]["task_ids"] == [first["id"]]
task = await client.get(f"/tasks/{first['id']}")
assert task.status_code == 200
assert task.json()["status"] == "todo"
async def test_missing_task_ids_are_rejected_without_updates(self, client):
import uuid
_, first, _ = await _seed_two_tasks(client)
missing_id = str(uuid.uuid4())
r = await client.post(
"/tasks/bulk-status-sync",
json={
"updates": [
{"task_id": first["id"], "status": "progress"},
{"task_id": missing_id, "status": "done"},
],
},
)
assert r.status_code == 404
assert r.json()["detail"]["task_ids"] == [missing_id]
task = await client.get(f"/tasks/{first['id']}")
assert task.status_code == 200
assert task.json()["status"] == "todo"