Files
state-hub/tests/test_token_events.py
tegwick b28298a2ec feat(dashboard): entity list UX — REF column, name cells, detail pages (CUST-WP-0030)
- ref-cell.js: REF column component — click=copy deeplink, dblclick=open
- field-help.js: field registry + fieldRow helper with help-tip decoration;
  FK fields (task_id, workstream_id, repo_id) render as async-linked cells
  with entity-title bubble-help on hover
- GET /token-events/{id} endpoint + get-by-id tests
- GET /repos/by-id/{repo_id} UUID lookup endpoint
- Landing pages: /token-events/[id], /workstreams/[id], /repos/[slug], /tasks/[id]
- token-cost.md: REF + Name columns on all three tables; parallel fetch of
  workstreams/tasks for title resolution
- reference.md: entity detail page URL scheme documented

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-29 22:35:35 +02:00

218 lines
8.4 KiB
Python

"""
Token events router tests.
Covers: create event, list with filters, summary aggregation (single task,
cross-workstream rollup, by-model breakdown).
"""
from __future__ import annotations
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _create_domain(client, slug="testdomain"):
r = await client.post("/domains/", json={"slug": slug, "name": "Test Domain"})
assert r.status_code == 201, r.text
return r.json()
async def _create_topic(client, domain_slug="testdomain"):
r = await client.post("/topics/", json={"slug": "testtopic", "title": "T", "domain": domain_slug})
assert r.status_code == 201, r.text
return r.json()
async def _create_workstream(client, topic_id, slug="ws1"):
r = await client.post("/workstreams/", json={"topic_id": topic_id, "slug": slug, "title": "WS"})
assert r.status_code == 201, r.text
return r.json()
async def _create_task(client, workstream_id):
r = await client.post("/tasks/", json={"workstream_id": workstream_id, "title": "task"})
assert r.status_code == 201, r.text
return r.json()
async def _post_event(client, tokens_in=100, tokens_out=50, **kwargs):
body = {"tokens_in": tokens_in, "tokens_out": tokens_out, **kwargs}
r = await client.post("/token-events/", json=body)
assert r.status_code == 201, r.text
return r.json()
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestTokenEventsCreate:
async def test_create_minimal(self, client):
ev = await _post_event(client, tokens_in=200, tokens_out=100)
assert ev["tokens_in"] == 200
assert ev["tokens_out"] == 100
assert ev["tokens_total"] == 300
assert ev["id"] is not None
async def test_create_with_all_fields(self, client):
await _create_domain(client)
topic = await _create_topic(client)
ws = await _create_workstream(client, topic["id"])
task = await _create_task(client, ws["id"])
ev = await _post_event(
client,
tokens_in=1000,
tokens_out=500,
task_id=task["id"],
model="claude-sonnet-4-6",
agent="custodian",
ref_type="task",
ref_id=task["id"],
note="T01 done",
session_id="ses-abc",
)
assert ev["task_id"] == task["id"]
assert ev["workstream_id"] == ws["id"] # auto-populated from task
assert ev["model"] == "claude-sonnet-4-6"
assert ev["tokens_total"] == 1500
async def test_workstream_auto_populated_from_task(self, client):
await _create_domain(client)
topic = await _create_topic(client)
ws = await _create_workstream(client, topic["id"])
task = await _create_task(client, ws["id"])
ev = await _post_event(client, task_id=task["id"])
assert ev["workstream_id"] == ws["id"]
@pytest.mark.asyncio
class TestTokenEventsList:
async def test_list_empty(self, client):
r = await client.get("/token-events/")
assert r.status_code == 200
assert r.json() == []
async def test_list_returns_events(self, client):
await _post_event(client, tokens_in=100, tokens_out=50)
await _post_event(client, tokens_in=200, tokens_out=100)
r = await client.get("/token-events/")
assert r.status_code == 200
assert len(r.json()) == 2
async def test_filter_by_task_id(self, client):
await _create_domain(client)
topic = await _create_topic(client)
ws = await _create_workstream(client, topic["id"])
task = await _create_task(client, ws["id"])
await _post_event(client, task_id=task["id"], tokens_in=100, tokens_out=50)
await _post_event(client, tokens_in=200, tokens_out=100) # unrelated
r = await client.get("/token-events/", params={"task_id": task["id"]})
assert r.status_code == 200
events = r.json()
assert len(events) == 1
assert events[0]["task_id"] == task["id"]
async def test_filter_by_model(self, client):
await _post_event(client, model="claude-sonnet-4-6", tokens_in=100, tokens_out=50)
await _post_event(client, model="claude-opus-4-6", tokens_in=200, tokens_out=100)
r = await client.get("/token-events/", params={"model": "claude-sonnet-4-6"})
assert r.status_code == 200
events = r.json()
assert len(events) == 1
assert events[0]["model"] == "claude-sonnet-4-6"
@pytest.mark.asyncio
class TestTokenSummary:
async def test_summary_single_task(self, client):
await _create_domain(client)
topic = await _create_topic(client)
ws = await _create_workstream(client, topic["id"])
task = await _create_task(client, ws["id"])
await _post_event(client, task_id=task["id"], tokens_in=500, tokens_out=300, model="model-a")
await _post_event(client, task_id=task["id"], tokens_in=100, tokens_out=50, model="model-a")
r = await client.get("/token-events/summary/", params={"scope": "task", "id": task["id"]})
assert r.status_code == 200
s = r.json()
assert s["scope"] == "task"
assert s["tokens_in"] == 600
assert s["tokens_out"] == 350
assert s["tokens_total"] == 950
assert s["event_count"] == 2
assert "model-a" in s["by_model"]
assert s["by_model"]["model-a"] == 950
async def test_summary_workstream_rollup(self, client):
await _create_domain(client)
topic = await _create_topic(client)
ws = await _create_workstream(client, topic["id"])
task1 = await _create_task(client, ws["id"])
task2 = await _create_task(client, ws["id"])
await _post_event(client, task_id=task1["id"], tokens_in=1000, tokens_out=500)
await _post_event(client, task_id=task2["id"], workstream_id=ws["id"], tokens_in=200, tokens_out=100)
r = await client.get("/token-events/summary/", params={"scope": "workstream", "id": ws["id"]})
assert r.status_code == 200
s = r.json()
# task1 auto-populates workstream_id; task2 explicitly sets it
assert s["tokens_total"] >= 1800
async def test_summary_by_model_breakdown(self, client):
await _post_event(client, model="sonnet", tokens_in=300, tokens_out=200, agent="custodian")
await _post_event(client, model="opus", tokens_in=100, tokens_out=50, agent="ralph")
await _post_event(client, model="sonnet", tokens_in=200, tokens_out=100)
# Use workstream_id scope via events directly tagged with workstream
# Instead, just check the ref_type/ref_id scope path
await _post_event(
client, model="sonnet", tokens_in=50, tokens_out=25,
ref_type="session", ref_id="ses-001",
)
r = await client.get("/token-events/summary/", params={"scope": "session", "id": "ses-001"})
assert r.status_code == 200
s = r.json()
assert s["event_count"] == 1
assert s["tokens_total"] == 75
async def test_summary_unknown_scope_returns_422(self, client):
r = await client.get("/token-events/summary/", params={"scope": "foobar", "id": "x"})
assert r.status_code == 422
async def test_summary_empty_scope_returns_zeros(self, client):
import uuid
r = await client.get("/token-events/summary/", params={"scope": "task", "id": str(uuid.uuid4())})
assert r.status_code == 200
s = r.json()
assert s["tokens_total"] == 0
assert s["event_count"] == 0
@pytest.mark.asyncio
class TestTokenEventGetById:
async def test_get_by_id(self, client):
ev = await _post_event(client, tokens_in=111, tokens_out=222, note="get-by-id-test")
r = await client.get(f"/token-events/{ev['id']}")
assert r.status_code == 200
result = r.json()
assert result["id"] == ev["id"]
assert result["tokens_in"] == 111
assert result["tokens_out"] == 222
assert result["tokens_total"] == 333
assert result["note"] == "get-by-id-test"
async def test_get_by_id_not_found(self, client):
import uuid
r = await client.get(f"/token-events/{uuid.uuid4()}")
assert r.status_code == 404