generated from coulomb/repo-seed
1515 lines
58 KiB
Python
1515 lines
58 KiB
Python
"""
|
|
Core router tests: topics, domains, workstreams, tasks, decisions, state summary.
|
|
|
|
Happy path + key error cases. All tests use a real PostgreSQL test database
|
|
(no mocking). The `client` fixture provides an httpx.AsyncClient backed by
|
|
the FastAPI ASGI app.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _create_domain(client, slug="testdomain", name="Test Domain"):
|
|
r = await client.post("/domains/", json={"slug": slug, "name": name})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_topic(client, domain_slug="testdomain", slug="testtopic", title="Test Topic"):
|
|
r = await client.post("/topics/", json={
|
|
"slug": slug, "title": title, "domain": domain_slug,
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_workstream(client, topic_id, slug="test-ws", title="Test WS", status="active", **extra):
|
|
payload = {
|
|
"topic_id": topic_id, "slug": slug, "title": title, "status": status,
|
|
}
|
|
payload.update(extra)
|
|
r = await client.post("/workstreams/", json=payload)
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_task(client, workstream_id, title="Test task"):
|
|
r = await client.post("/tasks/", json={
|
|
"workstream_id": workstream_id, "title": title,
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_repo(client, domain_slug="testdomain", slug="test-repo", local_path=None):
|
|
r = await client.post("/repos/", json={
|
|
"domain_slug": domain_slug,
|
|
"slug": slug,
|
|
"name": "Test Repo",
|
|
"local_path": str(local_path) if local_path else None,
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Domain tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDomains:
|
|
async def test_create_and_list(self, client):
|
|
await _create_domain(client)
|
|
r = await client.get("/domains/")
|
|
assert r.status_code == 200
|
|
slugs = [d["slug"] for d in r.json()]
|
|
assert "testdomain" in slugs
|
|
|
|
async def test_duplicate_slug_returns_409(self, client):
|
|
await _create_domain(client)
|
|
r = await client.post("/domains/", json={"slug": "testdomain", "name": "Dupe"})
|
|
assert r.status_code == 409
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Repo tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestRepos:
|
|
async def test_create_persists_host_paths(self, client):
|
|
await _create_domain(client)
|
|
r = await client.post("/repos/", json={
|
|
"domain_slug": "testdomain",
|
|
"slug": "hosted-repo",
|
|
"name": "Hosted Repo",
|
|
"host_paths": {"workstation": "/srv/hosted-repo"},
|
|
})
|
|
assert r.status_code == 201
|
|
assert r.json()["host_paths"] == {"workstation": "/srv/hosted-repo"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Topic tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTopics:
|
|
async def test_create_and_get(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
r = await client.get(f"/topics/{topic['id']}")
|
|
assert r.status_code == 200
|
|
assert r.json()["slug"] == "testtopic"
|
|
|
|
async def test_duplicate_slug_returns_409(self, client):
|
|
await _create_domain(client)
|
|
await _create_topic(client)
|
|
r = await client.post("/topics/", json={
|
|
"slug": "testtopic", "title": "Dupe", "domain": "testdomain",
|
|
})
|
|
assert r.status_code == 409
|
|
|
|
async def test_unknown_domain_returns_404(self, client):
|
|
r = await client.post("/topics/", json={
|
|
"slug": "x", "title": "X", "domain": "doesnotexist",
|
|
})
|
|
assert r.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Workstream tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestWorkstreams:
|
|
async def test_create_and_list_by_topic(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
|
|
r = await client.get(f"/workstreams/?topic_id={topic['id']}")
|
|
assert r.status_code == 200
|
|
ids = [w["id"] for w in r.json()]
|
|
assert ws["id"] in ids
|
|
|
|
async def test_status_transition(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
|
|
r = await client.patch(f"/workstreams/{ws['id']}", json={"status": "finished"})
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "finished"
|
|
|
|
async def test_legacy_completed_status_is_normalized(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
|
|
r = await client.patch(f"/workstreams/{ws['id']}", json={"status": "completed"})
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "finished"
|
|
|
|
async def test_filter_by_owner(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
await client.patch(f"/workstreams/{ws['id']}", json={"owner": "alice"})
|
|
|
|
r = await client.get("/workstreams/?owner=alice")
|
|
assert r.status_code == 200
|
|
assert len(r.json()) == 1
|
|
assert r.json()[0]["id"] == ws["id"]
|
|
|
|
async def test_filter_by_slug(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], slug="my-special-ws")
|
|
|
|
r = await client.get("/workstreams/?slug=my-special-ws")
|
|
assert r.status_code == 200
|
|
assert len(r.json()) == 1
|
|
|
|
async def test_workplan_index_route(self, client):
|
|
r = await client.get("/workstreams/workplan-index")
|
|
assert r.status_code == 200
|
|
assert "workstreams" in r.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Task tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTasks:
|
|
async def test_create_and_list_by_workstream(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.get(f"/tasks/?workstream_id={ws['id']}")
|
|
assert r.status_code == 200
|
|
ids = [t["id"] for t in r.json()]
|
|
assert task["id"] in ids
|
|
|
|
async def test_needs_human_flag(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={
|
|
"needs_human": True,
|
|
"intervention_note": "needs review",
|
|
})
|
|
assert r.status_code == 200
|
|
assert r.json()["needs_human"] is True
|
|
|
|
async def test_needs_human_without_note_returns_422(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={"needs_human": True})
|
|
assert r.status_code == 422
|
|
|
|
async def test_cancel_via_delete(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.delete(f"/tasks/{task['id']}")
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "cancel"
|
|
|
|
async def test_filter_by_priority(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
await client.post("/tasks/", json={
|
|
"workstream_id": ws["id"], "title": "High prio", "priority": "high",
|
|
})
|
|
await client.post("/tasks/", json={
|
|
"workstream_id": ws["id"], "title": "Low prio", "priority": "low",
|
|
})
|
|
|
|
r = await client.get(f"/tasks/?workstream_id={ws['id']}&priority=high")
|
|
assert r.status_code == 200
|
|
titles = [t["title"] for t in r.json()]
|
|
assert "High prio" in titles
|
|
assert "Low prio" not in titles
|
|
|
|
async def test_list_pagination_and_counts(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
first = await _create_task(client, ws["id"], title="First")
|
|
second = await _create_task(client, ws["id"], title="Second")
|
|
third = await _create_task(client, ws["id"], title="Third")
|
|
await client.patch(f"/tasks/{second['id']}", json={"status": "progress"})
|
|
await client.patch(f"/tasks/{third['id']}", json={"status": "wait", "blocking_reason": "blocked"})
|
|
|
|
r = await client.get("/tasks/?limit=2")
|
|
assert r.status_code == 200
|
|
body = r.json()
|
|
assert len(body) == 2
|
|
assert body[0]["id"] == first["id"]
|
|
assert body[1]["id"] == second["id"]
|
|
|
|
r = await client.get("/tasks/?limit=1&offset=2")
|
|
assert r.status_code == 200
|
|
assert [task["id"] for task in r.json()] == [third["id"]]
|
|
|
|
r = await client.get(f"/tasks/counts?workstream_id={ws['id']}")
|
|
assert r.status_code == 200
|
|
counts = {(row["workstream_id"], row["status"]): row["count"] for row in r.json()}
|
|
assert counts[(ws["id"], "todo")] == 1
|
|
assert counts[(ws["id"], "progress")] == 1
|
|
assert counts[(ws["id"], "wait")] == 1
|
|
|
|
@pytest.mark.parametrize("initial_status", ["proposed", "ready", "backlog"])
|
|
async def test_task_start_activates_planning_workstream(self, client, initial_status):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(
|
|
client,
|
|
topic["id"],
|
|
slug=f"{initial_status}-ws",
|
|
status=initial_status,
|
|
)
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={"status": "progress"})
|
|
assert r.status_code == 200
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "active"
|
|
|
|
async def test_task_start_does_not_unblock_blocked_workstream(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], slug="blocked-ws", status="blocked")
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={"status": "progress"})
|
|
assert r.status_code == 200
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "blocked"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Decision tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDecisions:
|
|
async def test_create_and_resolve(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
r = await client.post("/decisions/", json={
|
|
"title": "Should we use X?",
|
|
"topic_id": topic["id"],
|
|
})
|
|
assert r.status_code == 201
|
|
d_id = r.json()["id"]
|
|
|
|
r2 = await client.post(f"/decisions/{d_id}/resolve", json={
|
|
"rationale": "Yes, use X.",
|
|
"decided_by": "bernd",
|
|
})
|
|
assert r2.status_code == 200
|
|
assert r2.json()["status"] == "resolved"
|
|
|
|
async def test_resolve_already_resolved_returns_409(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
r = await client.post("/decisions/", json={
|
|
"title": "Already done", "topic_id": topic["id"],
|
|
})
|
|
d_id = r.json()["id"]
|
|
await client.post(f"/decisions/{d_id}/resolve", json={"rationale": "Done", "decided_by": "bernd"})
|
|
|
|
r2 = await client.post(f"/decisions/{d_id}/resolve", json={"rationale": "Again", "decided_by": "bernd"})
|
|
assert r2.status_code == 409
|
|
|
|
async def test_financial_keyword_auto_escalates(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
r = await client.post("/decisions/", json={
|
|
"title": "Purchase cloud credits",
|
|
"topic_id": topic["id"],
|
|
"decision_type": "pending",
|
|
})
|
|
assert r.status_code == 201
|
|
body = r.json()
|
|
assert body["status"] == "escalated"
|
|
assert body["escalation_note"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# State summary
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestStateSummary:
|
|
async def test_summary_returns_expected_shape(self, client):
|
|
r = await client.get("/state/summary")
|
|
assert r.status_code == 200
|
|
body = r.json()
|
|
assert "open_workstreams" in body
|
|
assert "blocking_decisions" in body
|
|
assert "blocked_tasks" in body
|
|
assert "domains" in body
|
|
|
|
async def test_summary_counts_reflect_created_data(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
# Mark as wait so it shows in waiting_tasks
|
|
await client.patch(f"/tasks/{task['id']}",
|
|
json={"status": "wait", "blocking_reason": "waiting on dep"})
|
|
|
|
r = await client.get("/state/summary")
|
|
body = r.json()
|
|
assert len(body["waiting_tasks"]) >= 1
|
|
|
|
async def test_summary_derives_blocked_workstream_from_flow_engine(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
blocked_ws = await _create_workstream(client, topic["id"], slug="blocked-ws")
|
|
dependency_ws = await _create_workstream(client, topic["id"], slug="dependency-ws")
|
|
|
|
r = await client.post(
|
|
f"/workstreams/{blocked_ws['id']}/dependencies/",
|
|
json={
|
|
"to_workstream_id": dependency_ws["id"],
|
|
"description": "Blocked until dependency completes",
|
|
},
|
|
)
|
|
assert r.status_code == 201
|
|
|
|
r = await client.get("/state/summary")
|
|
assert r.status_code == 200
|
|
body = r.json()
|
|
summaries = {item["id"]: item for item in body["open_workstreams"]}
|
|
|
|
assert summaries[blocked_ws["id"]]["status"] == "blocked"
|
|
assert summaries[blocked_ws["id"]]["blocked_reasons"][0]["id"] == "dependencies.all_complete"
|
|
assert body["totals"]["workstreams"]["blocked"] == 1
|
|
|
|
async def test_overview_returns_chart_ready_rows(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
repo = await _create_repo(client)
|
|
ws = await _create_workstream(client, topic["id"], repo_id=repo["id"])
|
|
first = await _create_task(client, ws["id"], title="Todo")
|
|
second = await _create_task(client, ws["id"], title="Done")
|
|
await client.patch(f"/tasks/{second['id']}", json={"status": "done", "suppress_token_event": True})
|
|
|
|
r = await client.get("/state/overview")
|
|
assert r.status_code == 200
|
|
assert r.headers["x-statehub-cache"] == "miss"
|
|
body = r.json()
|
|
|
|
rows = {row["id"]: row for row in body["workplan_rows"]}
|
|
assert ws["id"] in rows
|
|
assert rows[ws["id"]]["repo_label"] == "test-repo"
|
|
assert rows[ws["id"]]["domain"] == "testdomain"
|
|
assert rows[ws["id"]]["todo"] == 1
|
|
assert rows[ws["id"]]["done"] == 1
|
|
assert rows[ws["id"]]["total"] == 2
|
|
assert body["totals"]["tasks"]["total"] == 2
|
|
assert body["diagnostics"]["task_count_strategy"] == "grouped"
|
|
|
|
r = await client.get("/state/overview")
|
|
assert r.status_code == 200
|
|
assert r.headers["x-statehub-cache"] == "hit"
|
|
|
|
|
|
class TestFlowEndpoints:
|
|
async def test_list_flow_definitions(self, client):
|
|
r = await client.get("/flows/definitions")
|
|
assert r.status_code == 200
|
|
entity_types = {item["entity_type"] for item in r.json()}
|
|
assert {"workstream", "task", "contribution", "capability_request"} <= entity_types
|
|
|
|
async def test_get_flow_state_and_advance_workstream(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
await client.patch(f"/tasks/{task['id']}", json={"status": "done"})
|
|
|
|
r = await client.get(f"/flows/workstream/{ws['id']}")
|
|
assert r.status_code == 200
|
|
assert "finished" in r.json()["reachable"]
|
|
|
|
r = await client.post(f"/flows/workstream/{ws['id']}/advance/finished")
|
|
assert r.status_code == 200
|
|
assert r.json()["current_workstation"] == "finished"
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
assert r.json()["status"] == "finished"
|
|
|
|
async def test_advance_workstream_respects_current_exit_assertions(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], slug="exit-blocked-ws")
|
|
dependency_ws = await _create_workstream(client, topic["id"], slug="unfinished-dep")
|
|
task = await _create_task(client, ws["id"])
|
|
await client.patch(f"/tasks/{task['id']}", json={"status": "done"})
|
|
await client.post(
|
|
f"/workstreams/{ws['id']}/dependencies/",
|
|
json={
|
|
"to_workstream_id": dependency_ws["id"],
|
|
"description": "Dependency must finish first",
|
|
},
|
|
)
|
|
|
|
r = await client.post(f"/flows/workstream/{ws['id']}/advance/finished")
|
|
assert r.status_code == 409
|
|
assert r.json()["detail"]["blocking_assertions"][0]["id"] == "dependencies.all_complete"
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
assert r.json()["status"] == "active"
|
|
|
|
|
|
class TestReconciliationEndpoints:
|
|
async def test_classify_workstream_open_transition_write_through(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "workstream",
|
|
"target_id": ws["id"],
|
|
"target_status": "backlog",
|
|
"actor": "dashboard",
|
|
"intent": "push back to backlog",
|
|
"file_backed": True,
|
|
})
|
|
|
|
assert r.status_code == 200
|
|
body = r.json()
|
|
assert body["current_status"] == "active"
|
|
assert body["target_status"] == "backlog"
|
|
assert body["reconciliation_class"] == "write_through"
|
|
assert body["write_through_result"] == "not_attempted"
|
|
assert body["intent"] == "push back to backlog"
|
|
|
|
async def test_classify_workstream_finish_with_open_task_needs_confirmation(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
await _create_task(client, ws["id"])
|
|
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "workstream",
|
|
"target_id": ws["id"],
|
|
"target_status": "finished",
|
|
"file_backed": True,
|
|
})
|
|
|
|
assert r.status_code == 200
|
|
body = r.json()
|
|
assert body["tasks_terminal"] is False
|
|
assert body["reconciliation_class"] == "human_confirmation"
|
|
assert "open work" in body["reason"]
|
|
|
|
async def test_classify_task_wait_without_reason_needs_confirmation(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "task",
|
|
"target_id": task["id"],
|
|
"target_status": "wait",
|
|
"file_backed": True,
|
|
"task_linked": True,
|
|
})
|
|
|
|
assert r.status_code == 200
|
|
body = r.json()
|
|
assert body["current_status"] == "todo"
|
|
assert body["reconciliation_class"] == "human_confirmation"
|
|
assert "wait condition" in body["reason"]
|
|
|
|
async def test_classify_unknown_workstream_returns_404(self, client):
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "workstream",
|
|
"target_id": "00000000-0000-0000-0000-000000000001",
|
|
"target_status": "active",
|
|
})
|
|
|
|
assert r.status_code == 404
|
|
|
|
async def test_apply_workstream_write_through_patches_file_then_db(self, client, tmp_path):
|
|
await _create_domain(client)
|
|
repo_root = tmp_path / "repo"
|
|
workplans = repo_root / "workplans"
|
|
workplans.mkdir(parents=True)
|
|
repo = await _create_repo(client, local_path=repo_root)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], repo_id=repo["id"])
|
|
wp = workplans / "STATE-WP-9999-demo.md"
|
|
wp.write_text(
|
|
"---\n"
|
|
"id: STATE-WP-9999\n"
|
|
"type: workplan\n"
|
|
"title: Demo\n"
|
|
"domain: custodian\n"
|
|
"repo: state-hub\n"
|
|
"status: active\n"
|
|
f"state_hub_workstream_id: \"{ws['id']}\"\n"
|
|
"---\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "workstream",
|
|
"target_id": ws["id"],
|
|
"target_status": "backlog",
|
|
"actor": "dashboard",
|
|
"apply": True,
|
|
})
|
|
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert body["reconciliation_class"] == "write_through"
|
|
assert body["write_through_result"] == "applied"
|
|
assert body["workplan_path"] == "workplans/STATE-WP-9999-demo.md"
|
|
assert "status: backlog" in wp.read_text(encoding="utf-8")
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
assert r.json()["status"] == "backlog"
|
|
|
|
async def test_apply_workstream_without_file_does_not_mutate_db(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "workstream",
|
|
"target_id": ws["id"],
|
|
"target_status": "backlog",
|
|
"file_backed": True,
|
|
"apply": True,
|
|
})
|
|
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert body["file_backed"] is False
|
|
assert body["reconciliation_class"] == "deferred"
|
|
assert body["write_through_result"] == "not_applicable"
|
|
assert body["reconciliation_record_id"]
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
assert r.json()["status"] == "active"
|
|
|
|
r = await client.get("/messages/?to_agent=state-hub&unread_only=true")
|
|
assert r.status_code == 200
|
|
messages = r.json()
|
|
assert len(messages) == 1
|
|
assert messages[0]["id"] == body["reconciliation_record_id"]
|
|
assert "Reconcile workstream state change" in messages[0]["subject"]
|
|
assert ws["id"] in messages[0]["body"]
|
|
|
|
async def test_apply_task_write_through_patches_task_block_then_db(self, client, tmp_path):
|
|
await _create_domain(client)
|
|
repo_root = tmp_path / "repo"
|
|
workplans = repo_root / "workplans"
|
|
workplans.mkdir(parents=True)
|
|
repo = await _create_repo(client, local_path=repo_root)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], repo_id=repo["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
wp = workplans / "STATE-WP-9999-demo.md"
|
|
wp.write_text(
|
|
"---\n"
|
|
"id: STATE-WP-9999\n"
|
|
"type: workplan\n"
|
|
"title: Demo\n"
|
|
"domain: custodian\n"
|
|
"repo: state-hub\n"
|
|
"status: active\n"
|
|
f"state_hub_workstream_id: \"{ws['id']}\"\n"
|
|
"---\n\n"
|
|
"## Demo Task\n\n"
|
|
"```task\n"
|
|
"id: STATE-WP-9999-T01\n"
|
|
"status: todo\n"
|
|
"priority: high\n"
|
|
f"state_hub_task_id: \"{task['id']}\"\n"
|
|
"```\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "task",
|
|
"target_id": task["id"],
|
|
"target_status": "progress",
|
|
"actor": "dashboard",
|
|
"apply": True,
|
|
})
|
|
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert body["reconciliation_class"] == "write_through"
|
|
assert body["write_through_result"] == "applied"
|
|
assert body["workplan_path"] == "workplans/STATE-WP-9999-demo.md"
|
|
assert "status: progress" in wp.read_text(encoding="utf-8")
|
|
|
|
r = await client.get(f"/tasks/{task['id']}")
|
|
assert r.json()["status"] == "progress"
|
|
|
|
async def test_apply_task_start_write_through_activates_parent_file_and_db(self, client, tmp_path):
|
|
await _create_domain(client)
|
|
repo_root = tmp_path / "repo"
|
|
workplans = repo_root / "workplans"
|
|
workplans.mkdir(parents=True)
|
|
repo = await _create_repo(client, local_path=repo_root)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], repo_id=repo["id"], status="ready")
|
|
task = await _create_task(client, ws["id"])
|
|
wp = workplans / "STATE-WP-9999-demo.md"
|
|
wp.write_text(
|
|
"---\n"
|
|
"id: STATE-WP-9999\n"
|
|
"type: workplan\n"
|
|
"title: Demo\n"
|
|
"domain: custodian\n"
|
|
"repo: state-hub\n"
|
|
"status: ready\n"
|
|
f"state_hub_workstream_id: \"{ws['id']}\"\n"
|
|
"---\n\n"
|
|
"## Demo Task\n\n"
|
|
"```task\n"
|
|
"id: STATE-WP-9999-T01\n"
|
|
"status: todo\n"
|
|
"priority: high\n"
|
|
f"state_hub_task_id: \"{task['id']}\"\n"
|
|
"```\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "task",
|
|
"target_id": task["id"],
|
|
"target_status": "progress",
|
|
"actor": "dashboard",
|
|
"expected_current_status": "todo",
|
|
"apply": True,
|
|
})
|
|
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert body["write_through_result"] == "applied"
|
|
text = wp.read_text(encoding="utf-8")
|
|
assert "status: active" in text
|
|
assert "status: progress" in text
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
assert r.json()["status"] == "active"
|
|
|
|
r = await client.get(f"/tasks/{task['id']}")
|
|
assert r.json()["status"] == "progress"
|
|
|
|
async def test_apply_task_confirmation_case_creates_reconciliation_message(self, client, tmp_path):
|
|
await _create_domain(client)
|
|
repo_root = tmp_path / "repo"
|
|
workplans = repo_root / "workplans"
|
|
workplans.mkdir(parents=True)
|
|
repo = await _create_repo(client, local_path=repo_root)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], repo_id=repo["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
wp = workplans / "STATE-WP-9999-demo.md"
|
|
wp.write_text(
|
|
"---\n"
|
|
"id: STATE-WP-9999\n"
|
|
"type: workplan\n"
|
|
"title: Demo\n"
|
|
"domain: custodian\n"
|
|
"repo: state-hub\n"
|
|
"status: active\n"
|
|
f"state_hub_workstream_id: \"{ws['id']}\"\n"
|
|
"---\n\n"
|
|
"## Demo Task\n\n"
|
|
"```task\n"
|
|
"id: STATE-WP-9999-T01\n"
|
|
"status: todo\n"
|
|
"priority: high\n"
|
|
f"state_hub_task_id: \"{task['id']}\"\n"
|
|
"```\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "task",
|
|
"target_id": task["id"],
|
|
"target_status": "wait",
|
|
"apply": True,
|
|
})
|
|
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert body["reconciliation_class"] == "human_confirmation"
|
|
assert body["write_through_result"] == "not_applicable"
|
|
assert body["reconciliation_record_id"]
|
|
|
|
r = await client.get(f"/tasks/{task['id']}")
|
|
assert r.json()["status"] == "todo"
|
|
|
|
r = await client.get("/messages/?to_agent=state-hub&unread_only=true")
|
|
messages = r.json()
|
|
assert len(messages) == 1
|
|
assert "wait reason" in messages[0]["body"] or "wait condition" in messages[0]["body"]
|
|
|
|
async def test_apply_workstream_stale_expected_status_creates_conflict_message(self, client, tmp_path):
|
|
await _create_domain(client)
|
|
repo_root = tmp_path / "repo"
|
|
workplans = repo_root / "workplans"
|
|
workplans.mkdir(parents=True)
|
|
repo = await _create_repo(client, local_path=repo_root)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], repo_id=repo["id"])
|
|
wp = workplans / "STATE-WP-9999-demo.md"
|
|
wp.write_text(
|
|
"---\n"
|
|
"id: STATE-WP-9999\n"
|
|
"type: workplan\n"
|
|
"title: Demo\n"
|
|
"domain: custodian\n"
|
|
"repo: state-hub\n"
|
|
"status: active\n"
|
|
f"state_hub_workstream_id: \"{ws['id']}\"\n"
|
|
"---\n",
|
|
encoding="utf-8",
|
|
)
|
|
await client.patch(f"/workstreams/{ws['id']}", json={"status": "ready"})
|
|
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "workstream",
|
|
"target_id": ws["id"],
|
|
"target_status": "backlog",
|
|
"expected_current_status": "active",
|
|
"apply": True,
|
|
})
|
|
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert body["conflict"] is True
|
|
assert body["write_through_result"] == "not_applicable"
|
|
assert body["current_status"] == "ready"
|
|
assert "expected" in body["reason"]
|
|
assert "status: active" in wp.read_text(encoding="utf-8")
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
assert r.json()["status"] == "ready"
|
|
|
|
async def test_apply_workstream_file_status_drift_creates_conflict_message(self, client, tmp_path):
|
|
await _create_domain(client)
|
|
repo_root = tmp_path / "repo"
|
|
workplans = repo_root / "workplans"
|
|
workplans.mkdir(parents=True)
|
|
repo = await _create_repo(client, local_path=repo_root)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], repo_id=repo["id"])
|
|
wp = workplans / "STATE-WP-9999-demo.md"
|
|
wp.write_text(
|
|
"---\n"
|
|
"id: STATE-WP-9999\n"
|
|
"type: workplan\n"
|
|
"title: Demo\n"
|
|
"domain: custodian\n"
|
|
"repo: state-hub\n"
|
|
"status: ready\n"
|
|
f"state_hub_workstream_id: \"{ws['id']}\"\n"
|
|
"---\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "workstream",
|
|
"target_id": ws["id"],
|
|
"target_status": "backlog",
|
|
"expected_current_status": "active",
|
|
"apply": True,
|
|
})
|
|
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert body["conflict"] is True
|
|
assert body["write_through_result"] == "not_applicable"
|
|
assert "differs from cached DB status" in body["reason"]
|
|
assert "status: ready" in wp.read_text(encoding="utf-8")
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
assert r.json()["status"] == "active"
|
|
|
|
async def test_apply_task_unavailable_host_path_creates_conflict_message(self, client, tmp_path):
|
|
await _create_domain(client)
|
|
repo = await _create_repo(client, local_path=tmp_path / "missing-repo")
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], repo_id=repo["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.post("/reconciliation/state-change", json={
|
|
"target_type": "task",
|
|
"target_id": task["id"],
|
|
"target_status": "progress",
|
|
"expected_current_status": "todo",
|
|
"apply": True,
|
|
})
|
|
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert body["conflict"] is True
|
|
assert body["write_through_result"] == "not_applicable"
|
|
assert "host path" in body["reason"]
|
|
assert body["reconciliation_record_id"]
|
|
|
|
r = await client.get(f"/tasks/{task['id']}")
|
|
assert r.json()["status"] == "todo"
|
|
|
|
|
|
class TestExecutionQueueEndpoints:
|
|
async def test_execution_semantics_separates_state_hub_and_activity_core(self, client):
|
|
r = await client.get("/execution/semantics")
|
|
|
|
assert r.status_code == 200
|
|
body = r.json()
|
|
assert "queued" in body["execution_states"]
|
|
assert "immediate" in body["launch_modes"]
|
|
assert "parallel" in body["concurrency_modes"]
|
|
assert any("launch requests" in item for item in body["state_hub_responsibility"])
|
|
assert any("dispatch" in item for item in body["activity_core_responsibility"])
|
|
|
|
async def test_execution_intent_update_does_not_change_lifecycle_status(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], status="ready")
|
|
|
|
r = await client.patch(f"/execution/workstreams/{ws['id']}/intent", json={
|
|
"execution_state": "queued",
|
|
"launch_mode": "queued",
|
|
"concurrency_mode": "parallel",
|
|
"queue_rank": 7,
|
|
"execution_group": "ui-state",
|
|
})
|
|
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert body["execution_state"] == "queued"
|
|
assert body["launch_mode"] == "queued"
|
|
assert body["concurrency_mode"] == "parallel"
|
|
assert body["queue_rank"] == 7
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
assert r.json()["status"] == "ready"
|
|
assert r.json()["execution_state"] == "queued"
|
|
|
|
async def test_workplan_stack_orders_eligible_queued_work_before_blocked(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
queued = await _create_workstream(
|
|
client,
|
|
topic["id"],
|
|
slug="queued-wp",
|
|
status="ready",
|
|
planning_priority="high",
|
|
planning_order=2,
|
|
)
|
|
blocked = await _create_workstream(
|
|
client,
|
|
topic["id"],
|
|
slug="blocked-wp",
|
|
status="ready",
|
|
planning_priority="high",
|
|
planning_order=1,
|
|
)
|
|
lifecycle_blocked = await _create_workstream(
|
|
client,
|
|
topic["id"],
|
|
slug="lifecycle-blocked-wp",
|
|
status="blocked",
|
|
planning_priority="high",
|
|
planning_order=0,
|
|
)
|
|
dependency = await _create_workstream(
|
|
client,
|
|
topic["id"],
|
|
slug="dependency-wp",
|
|
status="active",
|
|
planning_priority="low",
|
|
planning_order=3,
|
|
)
|
|
await client.patch(f"/execution/workstreams/{queued['id']}/intent", json={
|
|
"execution_state": "queued",
|
|
"launch_mode": "queued",
|
|
"queue_rank": 2,
|
|
})
|
|
await client.patch(f"/execution/workstreams/{blocked['id']}/intent", json={
|
|
"execution_state": "queued",
|
|
"launch_mode": "queued",
|
|
"queue_rank": 1,
|
|
})
|
|
await client.patch(f"/execution/workstreams/{lifecycle_blocked['id']}/intent", json={
|
|
"execution_state": "queued",
|
|
"launch_mode": "queued",
|
|
"queue_rank": 0,
|
|
})
|
|
await client.post(
|
|
f"/workstreams/{blocked['id']}/dependencies/",
|
|
json={"to_workstream_id": dependency["id"], "description": "wait"},
|
|
)
|
|
|
|
r = await client.get("/execution/workplan-stack?include_manual=false")
|
|
|
|
assert r.status_code == 200, r.text
|
|
rows = r.json()
|
|
assert [row["slug"] for row in rows] == ["queued-wp", "lifecycle-blocked-wp", "blocked-wp"]
|
|
assert rows[0]["eligible"] is True
|
|
assert rows[1]["eligible"] is False
|
|
assert rows[1]["status"] == "blocked"
|
|
assert rows[2]["eligible"] is False
|
|
assert rows[2]["blocked_by_workstream_ids"] == [dependency["id"]]
|
|
|
|
async def test_launch_request_records_handoff_and_updates_execution_intent(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], status="ready")
|
|
|
|
r = await client.post("/execution/launch-requests", json={
|
|
"workstream_id": ws["id"],
|
|
"requested_by": "dashboard",
|
|
"requested_actor": "activity-core",
|
|
"launch_mode": "immediate",
|
|
"concurrency_mode": "sequential",
|
|
"priority": "high",
|
|
"branch_preference": "codex/state-wp-0049",
|
|
"immediate_pickup": True,
|
|
"notes": "start now",
|
|
})
|
|
|
|
assert r.status_code == 201, r.text
|
|
body = r.json()
|
|
assert body["workstream_id"] == ws["id"]
|
|
assert body["launch_mode"] == "immediate"
|
|
assert body["immediate_pickup"] is True
|
|
assert body["status"] == "requested"
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
updated = r.json()
|
|
assert updated["status"] == "ready"
|
|
assert updated["execution_state"] == "launching"
|
|
assert updated["launch_mode"] == "immediate"
|
|
|
|
r = await client.get(f"/execution/launch-requests?workstream_id={ws['id']}")
|
|
assert len(r.json()) == 1
|
|
|
|
|
|
def _fabric_graph_export(generated_at="2026-05-23T12:00:00Z", extra_node=False):
|
|
nodes = [
|
|
{
|
|
"id": "the-custodian.state-hub",
|
|
"kind": "ServiceDeclaration",
|
|
"name": "State Hub",
|
|
"repo": "state-hub",
|
|
"domain": "custodian",
|
|
"lifecycle": "active",
|
|
"canon_category": "service",
|
|
"canon_anchor": "state-hub",
|
|
"mapping_fit": "direct",
|
|
"evidence_state": "declared",
|
|
"attributes": {"state_hub_repo_id": "state-hub"},
|
|
},
|
|
{
|
|
"id": "the-custodian.state-hub.http-api",
|
|
"kind": "InterfaceDeclaration",
|
|
"name": "State Hub HTTP API",
|
|
"repo": "state-hub",
|
|
"domain": "custodian",
|
|
"lifecycle": "active",
|
|
"canon_category": "interface",
|
|
"mapping_fit": "direct",
|
|
"evidence_state": "observed",
|
|
"attributes": {},
|
|
},
|
|
{
|
|
"id": "the-custodian.state-hub.coordination",
|
|
"kind": "CapabilityDeclaration",
|
|
"name": "Coordination",
|
|
"repo": "state-hub",
|
|
"domain": "custodian",
|
|
"lifecycle": "active",
|
|
"canon_category": "capability",
|
|
"mapping_fit": "direct",
|
|
"evidence_state": "declared",
|
|
"attributes": {},
|
|
},
|
|
]
|
|
edges = [
|
|
{
|
|
"from": "the-custodian.state-hub",
|
|
"to": "the-custodian.state-hub.http-api",
|
|
"type": "exposes",
|
|
"canonical_type": "exposes",
|
|
"canon_anchor": "state-hub-http",
|
|
"mapping_fit": "direct",
|
|
"display_only": False,
|
|
"evidence_state": "observed",
|
|
"attributes": {},
|
|
},
|
|
{
|
|
"from": "the-custodian.state-hub",
|
|
"to": "the-custodian.state-hub.coordination",
|
|
"type": "provides",
|
|
"canonical_type": "implements",
|
|
"mapping_fit": "direct",
|
|
"display_only": False,
|
|
"evidence_state": "declared",
|
|
"attributes": {},
|
|
},
|
|
{
|
|
"from": "the-custodian.state-hub.coordination",
|
|
"to": "the-custodian.state-hub.http-api",
|
|
"type": "depends_on",
|
|
"canonical_type": "depends_on",
|
|
"mapping_fit": "direct",
|
|
"display_only": False,
|
|
"evidence_state": "declared",
|
|
"attributes": {},
|
|
},
|
|
]
|
|
if extra_node:
|
|
nodes.append(
|
|
{
|
|
"id": "railiance.fabric.registry",
|
|
"kind": "ServiceDeclaration",
|
|
"name": "Railiance Fabric Registry",
|
|
"repo": "railiance-fabric",
|
|
"domain": "custodian",
|
|
"lifecycle": "active",
|
|
"canon_category": "service",
|
|
"mapping_fit": "direct",
|
|
"evidence_state": "observed",
|
|
"attributes": {},
|
|
}
|
|
)
|
|
edges.append(
|
|
{
|
|
"from": "railiance.fabric.registry",
|
|
"to": "the-custodian.state-hub",
|
|
"type": "exposes",
|
|
"canonical_type": "exposes",
|
|
"mapping_fit": "direct",
|
|
"display_only": False,
|
|
"evidence_state": "observed",
|
|
"attributes": {},
|
|
}
|
|
)
|
|
return {
|
|
"apiVersion": "railiance.fabric/v1alpha1",
|
|
"kind": "FabricGraphExport",
|
|
"generated_at": generated_at,
|
|
"source": {
|
|
"repo": "registry",
|
|
"commit": "abc123",
|
|
"path": ".railiance-fabric/registry.sqlite3",
|
|
},
|
|
"nodes": nodes,
|
|
"edges": edges,
|
|
}
|
|
|
|
|
|
def _financial_fabric_graph_export(generated_at="2026-05-24T00:00:00Z"):
|
|
return {
|
|
"apiVersion": "railiance.fabric/v1alpha2",
|
|
"kind": "FabricGraphExport",
|
|
"schema_version": "financial-fabric-v1",
|
|
"generated_at": generated_at,
|
|
"source": {
|
|
"producer": "railiance-fabric",
|
|
"registry": "registry",
|
|
"commit": "financial-example",
|
|
"generation_reason": "operator_refresh",
|
|
},
|
|
"compatibility": {
|
|
"legacy_v1alpha1_supported": True,
|
|
"breaking_reset": False,
|
|
},
|
|
"netkingdom": {
|
|
"id": "railiance.netkingdom",
|
|
"name": "Railiance Netkingdom",
|
|
"king_actor_id": "actor.railiance.king",
|
|
},
|
|
"actors": [
|
|
{
|
|
"id": "actor.railiance.king",
|
|
"kind": "FabricActor",
|
|
"role": "king",
|
|
"name": "Railiance King",
|
|
},
|
|
{
|
|
"id": "actor.railiance.primary-lord",
|
|
"kind": "FabricActor",
|
|
"role": "lord",
|
|
"name": "Railiance Primary Lord",
|
|
},
|
|
{
|
|
"id": "actor.coulomb.tenant",
|
|
"kind": "FabricActor",
|
|
"role": "tenant",
|
|
"name": "Coulomb Tenant",
|
|
},
|
|
],
|
|
"fabrics": [
|
|
{
|
|
"id": "fabric.railiance.primary",
|
|
"kind": "Fabric",
|
|
"name": "Railiance Primary Fabric",
|
|
"netkingdom_id": "railiance.netkingdom",
|
|
"lord_actor_id": "actor.railiance.primary-lord",
|
|
"parent_fabric_id": None,
|
|
"status": "active",
|
|
"boundary": {"boundary_type": "fabric"},
|
|
"evidence_refs": [],
|
|
},
|
|
{
|
|
"id": "subfabric.railiance.tenant.coulomb",
|
|
"kind": "Subfabric",
|
|
"name": "Coulomb Tenant Subfabric",
|
|
"netkingdom_id": "railiance.netkingdom",
|
|
"parent_fabric_id": "fabric.railiance.primary",
|
|
"tenant_actor_id": "actor.coulomb.tenant",
|
|
"status": "planned",
|
|
"boundary": {"boundary_type": "subfabric"},
|
|
"evidence_refs": [],
|
|
},
|
|
],
|
|
"nodes": [
|
|
{
|
|
"id": "state-hub.http",
|
|
"kind": "UtilityInterface",
|
|
"name": "State Hub HTTP API",
|
|
"repo": "state-hub",
|
|
"domain": "custodian",
|
|
"lifecycle": "active",
|
|
"containment": {
|
|
"netkingdom_id": "railiance.netkingdom",
|
|
"fabric_id": "fabric.railiance.primary",
|
|
"subfabric_id": None,
|
|
"environment": "local",
|
|
"deployment_scenario_id": None,
|
|
},
|
|
"ownership": {
|
|
"owner_actor_id": "actor.railiance.primary-lord",
|
|
"owner_role": "lord",
|
|
"resolution": "inherited",
|
|
"inherited_from": "fabric.railiance.primary",
|
|
"supporting_actor_ids": [],
|
|
},
|
|
"accounting": {
|
|
"cost_center_id": "cc.platform.shared",
|
|
"allocation_model": "direct",
|
|
},
|
|
"evidence": {
|
|
"state": "declared",
|
|
"review_state": "accepted",
|
|
"confidence": 0.9,
|
|
"refs": [],
|
|
},
|
|
"canon_category": "endpoint",
|
|
"canon_anchor": "model/network",
|
|
"mapping_fit": "partial",
|
|
"evidence_state": "declared",
|
|
"attributes": {},
|
|
},
|
|
{
|
|
"id": "coulomb.automation-client",
|
|
"kind": "Service",
|
|
"name": "Coulomb Automation Client",
|
|
"repo": "coulomb-automation",
|
|
"domain": "railiance",
|
|
"lifecycle": "planned",
|
|
"containment": {
|
|
"netkingdom_id": "railiance.netkingdom",
|
|
"fabric_id": "fabric.railiance.primary",
|
|
"subfabric_id": "subfabric.railiance.tenant.coulomb",
|
|
"environment": "local",
|
|
"deployment_scenario_id": None,
|
|
},
|
|
"ownership": {
|
|
"owner_actor_id": "actor.coulomb.tenant",
|
|
"owner_role": "tenant",
|
|
"resolution": "explicit",
|
|
"supporting_actor_ids": [],
|
|
},
|
|
"accounting": {
|
|
"cost_center_id": "cc.coulomb.automation",
|
|
"allocation_model": "direct",
|
|
},
|
|
"evidence": {
|
|
"state": "declared",
|
|
"review_state": "accepted",
|
|
"confidence": 0.8,
|
|
"refs": [],
|
|
},
|
|
"attributes": {},
|
|
},
|
|
],
|
|
"edges": [
|
|
{
|
|
"id": "utility:state-hub-http:coulomb-client",
|
|
"from": "state-hub.http",
|
|
"to": "coulomb.automation-client",
|
|
"type": "provides_utility_to",
|
|
"relationship_category": "utility",
|
|
"canonical_type": "depends_on",
|
|
"canon_anchor": "model/landscape",
|
|
"mapping_fit": "partial",
|
|
"display_only": False,
|
|
"evidence_state": "declared",
|
|
"provider": {
|
|
"owner_actor_id": "actor.railiance.primary-lord",
|
|
"fabric_id": "fabric.railiance.primary",
|
|
"subfabric_id": None,
|
|
},
|
|
"consumer": {
|
|
"owner_actor_id": "actor.coulomb.tenant",
|
|
"fabric_id": "fabric.railiance.primary",
|
|
"subfabric_id": "subfabric.railiance.tenant.coulomb",
|
|
},
|
|
"boundary": {
|
|
"crosses_fabric_boundary": False,
|
|
"crosses_subfabric_boundary": True,
|
|
},
|
|
"utility": {
|
|
"utility_type": "coordination_api",
|
|
"contract_id": "state-hub.http",
|
|
"payment_schema_id": "payment.internal-tenant-access",
|
|
"metering_basis": "unknown",
|
|
"business_model": "tenant_utility",
|
|
},
|
|
"accounting": {
|
|
"provider_profit_center_id": "pc.tenant-utilities",
|
|
"consumer_cost_center_id": "cc.coulomb.automation",
|
|
"allocation_model": "usage_weighted",
|
|
},
|
|
"evidence": {
|
|
"state": "declared",
|
|
"review_state": "accepted",
|
|
"confidence": 0.8,
|
|
"refs": [],
|
|
},
|
|
"attributes": {},
|
|
}
|
|
],
|
|
"unresolved": [],
|
|
}
|
|
|
|
|
|
class TestFabricGraphReadModel:
|
|
async def test_validation_failure_records_failed_import_without_read_model_rows(self, client):
|
|
payload = _fabric_graph_export()
|
|
payload.pop("kind")
|
|
|
|
r = await client.post("/fabric/graph-exports", json=payload)
|
|
|
|
assert r.status_code == 422
|
|
body = r.json()["detail"]
|
|
assert body["validation_status"] == "invalid"
|
|
assert body["import_id"]
|
|
|
|
r = await client.get("/fabric/graph-exports?validation_status=invalid")
|
|
assert r.status_code == 200
|
|
imports = r.json()
|
|
assert len(imports) == 1
|
|
assert imports[0]["node_count"] == 0
|
|
assert imports[0]["edge_count"] == 0
|
|
assert imports[0]["error_details"]["error"].startswith("invalid FabricGraphExport")
|
|
|
|
r = await client.get("/fabric/graph/nodes")
|
|
assert r.status_code == 404
|
|
|
|
r = await client.get("/progress/?event_type=fabric_graph_import")
|
|
assert r.status_code == 200
|
|
assert "rejected" in r.json()[0]["summary"]
|
|
|
|
async def test_idempotent_reingest_uses_canonical_graph_content_hash(self, client):
|
|
first = _fabric_graph_export(generated_at="2026-05-23T12:00:00Z")
|
|
second = _fabric_graph_export(generated_at="2026-05-23T12:05:00Z")
|
|
|
|
r = await client.post("/fabric/graph-exports", json=first)
|
|
assert r.status_code == 200, r.text
|
|
first_body = r.json()
|
|
assert first_body["created"] is True
|
|
assert first_body["idempotent"] is False
|
|
|
|
r = await client.post("/fabric/graph-exports", json=second)
|
|
assert r.status_code == 200, r.text
|
|
second_body = r.json()
|
|
assert second_body["created"] is False
|
|
assert second_body["idempotent"] is True
|
|
assert second_body["import_run"]["id"] == first_body["import_run"]["id"]
|
|
|
|
r = await client.get("/fabric/graph-exports")
|
|
assert len(r.json()) == 1
|
|
r = await client.get("/fabric/graph/nodes")
|
|
assert len(r.json()) == 3
|
|
|
|
async def test_latest_import_selection_tracks_new_graph_content(self, client):
|
|
r = await client.post("/fabric/graph-exports", json=_fabric_graph_export())
|
|
assert r.status_code == 200, r.text
|
|
first_id = r.json()["import_run"]["id"]
|
|
|
|
r = await client.post("/fabric/graph-exports", json=_fabric_graph_export(extra_node=True))
|
|
assert r.status_code == 200, r.text
|
|
second_id = r.json()["import_run"]["id"]
|
|
assert second_id != first_id
|
|
|
|
r = await client.get("/fabric/graph-exports/latest")
|
|
latest = r.json()
|
|
assert latest["id"] == second_id
|
|
assert latest["node_count"] == 4
|
|
assert latest["edge_count"] == 4
|
|
|
|
r = await client.get("/fabric/graph-exports")
|
|
latest_flags = {row["id"]: row["is_latest"] for row in r.json()}
|
|
assert latest_flags[first_id] is False
|
|
assert latest_flags[second_id] is True
|
|
|
|
async def test_read_only_queries_filter_graph_without_mutating_state_hub_entities(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"], status="ready")
|
|
task = await _create_task(client, ws["id"])
|
|
r = await client.post("/fabric/graph-exports", json=_fabric_graph_export(extra_node=True))
|
|
assert r.status_code == 200, r.text
|
|
|
|
before_progress = await client.get("/progress/")
|
|
before_progress_count = len(before_progress.json())
|
|
|
|
r = await client.get("/fabric/graph/summary")
|
|
assert r.status_code == 200
|
|
summary = r.json()
|
|
assert summary["node_count"] == 4
|
|
assert summary["edge_count"] == 4
|
|
assert summary["nodes_by_repo"]["state-hub"] == 3
|
|
assert summary["edges_by_canonical_type"]["exposes"] == 2
|
|
|
|
r = await client.get("/fabric/graph/nodes?repo=state-hub&canonical_category=service")
|
|
assert r.status_code == 200
|
|
assert [node["graph_id"] for node in r.json()] == ["the-custodian.state-hub"]
|
|
|
|
for relationship in ("exposes", "depends_on", "implements"):
|
|
r = await client.get(f"/fabric/graph/edges?canonical_relationship={relationship}")
|
|
assert r.status_code == 200
|
|
assert len(r.json()) >= 1
|
|
|
|
after_progress = await client.get("/progress/")
|
|
assert len(after_progress.json()) == before_progress_count
|
|
|
|
r = await client.get(f"/workstreams/{ws['id']}")
|
|
assert r.json()["status"] == "ready"
|
|
r = await client.get(f"/tasks/{task['id']}")
|
|
assert r.json()["status"] == "todo"
|
|
|
|
async def test_financial_vnext_ingest_materializes_ownership_utility_and_summary(self, client):
|
|
r = await client.post("/fabric/graph-exports", json=_financial_fabric_graph_export())
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert body["import_run"]["api_version"] == "railiance.fabric/v1alpha2"
|
|
assert body["import_run"]["schema_version"] == "financial-fabric-v1"
|
|
assert body["import_run"]["netkingdom_id"] == "railiance.netkingdom"
|
|
assert body["import_run"]["actor_count"] == 3
|
|
assert body["import_run"]["fabric_count"] == 2
|
|
|
|
r = await client.post(
|
|
"/fabric/graph-exports",
|
|
json=_financial_fabric_graph_export(generated_at="2026-05-24T00:05:00Z"),
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
second_body = r.json()
|
|
assert second_body["created"] is False
|
|
assert second_body["idempotent"] is True
|
|
assert second_body["import_run"]["id"] == body["import_run"]["id"]
|
|
|
|
r = await client.get("/fabric/graph/nodes?owner_role=tenant")
|
|
assert r.status_code == 200
|
|
tenant_nodes = r.json()
|
|
assert [node["graph_id"] for node in tenant_nodes] == ["coulomb.automation-client"]
|
|
assert tenant_nodes[0]["subfabric_id"] == "subfabric.railiance.tenant.coulomb"
|
|
assert tenant_nodes[0]["owner_actor_id"] == "actor.coulomb.tenant"
|
|
assert tenant_nodes[0]["ownership_resolution"] == "explicit"
|
|
assert tenant_nodes[0]["cost_center_id"] == "cc.coulomb.automation"
|
|
|
|
r = await client.get(
|
|
"/fabric/graph/edges"
|
|
"?relationship_category=utility"
|
|
"&consumer_owner_actor_id=actor.coulomb.tenant"
|
|
"&crosses_subfabric_boundary=true"
|
|
)
|
|
assert r.status_code == 200
|
|
utility_edges = r.json()
|
|
assert len(utility_edges) == 1
|
|
assert utility_edges[0]["utility_type"] == "coordination_api"
|
|
assert utility_edges[0]["utility_payment_schema_id"] == "payment.internal-tenant-access"
|
|
assert utility_edges[0]["provider_profit_center_id"] == "pc.tenant-utilities"
|
|
|
|
r = await client.get("/fabric/graph/summary")
|
|
assert r.status_code == 200
|
|
summary = r.json()
|
|
assert summary["schema_version"] == "financial-fabric-v1"
|
|
assert summary["nodes_by_fabric"]["fabric.railiance.primary"] == 2
|
|
assert summary["nodes_by_subfabric"]["subfabric.railiance.tenant.coulomb"] == 1
|
|
assert summary["nodes_by_owner_role"]["lord"] == 1
|
|
assert summary["nodes_by_owner_role"]["tenant"] == 1
|
|
assert summary["edges_by_relationship_category"]["utility"] == 1
|
|
assert summary["utility_edges_by_provider_owner"]["actor.railiance.primary-lord"] == 1
|
|
assert summary["utility_edges_by_consumer_owner"]["actor.coulomb.tenant"] == 1
|
|
assert summary["utility_edges_by_business_model"]["tenant_utility"] == 1
|
|
assert summary["tenant_utilities_without_payment_schema"] == 0
|
|
assert summary["unresolved_ownership_count"] == 0
|
|
|
|
async def test_financial_vnext_validation_rejects_unresolved_accepted_ownership(self, client):
|
|
payload = _financial_fabric_graph_export()
|
|
payload["nodes"][0]["ownership"]["resolution"] = "unresolved"
|
|
|
|
r = await client.post("/fabric/graph-exports", json=payload)
|
|
|
|
assert r.status_code == 422
|
|
body = r.json()["detail"]
|
|
assert body["validation_status"] == "invalid"
|
|
assert "accepted nodes" in body["message"]
|
|
|
|
r = await client.get("/fabric/graph/nodes")
|
|
assert r.status_code == 404
|
|
|
|
async def test_legacy_fabric_exports_remain_compatible_with_null_financial_fields(self, client):
|
|
r = await client.post("/fabric/graph-exports", json=_fabric_graph_export())
|
|
assert r.status_code == 200, r.text
|
|
assert r.json()["import_run"]["schema_version"] is None
|
|
|
|
r = await client.get("/fabric/graph/nodes?repo=state-hub&canonical_category=service")
|
|
assert r.status_code == 200
|
|
node = r.json()[0]
|
|
assert node["graph_id"] == "the-custodian.state-hub"
|
|
assert node["fabric_id"] is None
|
|
assert node["owner_actor_id"] is None
|
|
|
|
r = await client.get("/fabric/graph/summary")
|
|
assert r.status_code == 200
|
|
summary = r.json()
|
|
assert summary["schema_version"] is None
|
|
assert summary["nodes_by_fabric"] == {}
|