generated from coulomb/repo-seed
Introduces end-to-end token consumption tracking so agent work is visible as a cost/effort metric alongside tasks and workplans. - Migration o2j3k4l5m6n7: token_events table with FK indexes on task_id, workstream_id, repo_id, created_at - ORM model, Pydantic schemas (TokenEventCreate, TokenEventRead with computed tokens_total, TokenSummary) - Router: POST /token-events/, GET /token-events/ (7 filters), GET /token-events/summary/ (task|workstream|repo|commit|release scope) - MCP tools: record_token_event, get_token_summary (formatted table) - update_task_status enriched with optional tokens_in/tokens_out passthrough — one call creates status update + token event - Dashboard token-cost.md page: by-repo bar, by-workplan table, by-model bar, top-10 tasks by tokens - ralph-workplan skill updated with token reporting guidance and per-task heuristics for estimating counts - Tests: test_token_events.py + test_token_passthrough.py (182 pass) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
82 lines
2.9 KiB
Python
82 lines
2.9 KiB
Python
"""
|
|
Token passthrough test: update_task_status with tokens_in/tokens_out
|
|
creates a token event automatically.
|
|
|
|
Tests the API-level behaviour (the MCP tool delegates to the same endpoints).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
|
|
async def _create_domain(client, slug="td"):
|
|
r = await client.post("/domains/", json={"slug": slug, "name": "D"})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_topic(client, domain_slug="td"):
|
|
r = await client.post("/topics/", json={"slug": "tp", "title": "T", "domain": domain_slug})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_workstream(client, topic_id):
|
|
r = await client.post("/workstreams/", json={"topic_id": topic_id, "slug": "ws", "title": "WS"})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_task(client, workstream_id):
|
|
r = await client.post("/tasks/", json={"workstream_id": workstream_id, "title": "my task"})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestTokenPassthrough:
|
|
async def test_update_status_with_tokens_creates_event(self, client):
|
|
"""PATCH /tasks/{id} with tokens_in/tokens_out creates a token_event."""
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
# Update task status with token data
|
|
r = await client.patch(f"/tasks/{task['id']}", json={
|
|
"status": "done",
|
|
"tokens_in": 1200,
|
|
"tokens_out": 800,
|
|
"model": "claude-sonnet-4-6",
|
|
"agent": "custodian",
|
|
})
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "done"
|
|
|
|
# Token event should now exist for this task
|
|
r2 = await client.get("/token-events/", params={"task_id": task["id"]})
|
|
assert r2.status_code == 200
|
|
events = r2.json()
|
|
assert len(events) == 1
|
|
ev = events[0]
|
|
assert ev["tokens_in"] == 1200
|
|
assert ev["tokens_out"] == 800
|
|
assert ev["tokens_total"] == 2000
|
|
assert ev["model"] == "claude-sonnet-4-6"
|
|
assert ev["agent"] == "custodian"
|
|
assert ev["workstream_id"] == ws["id"]
|
|
|
|
async def test_update_status_without_tokens_creates_no_event(self, client):
|
|
"""PATCH /tasks/{id} without token fields creates no token_event."""
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={"status": "in_progress"})
|
|
assert r.status_code == 200
|
|
|
|
r2 = await client.get("/token-events/", params={"task_id": task["id"]})
|
|
assert r2.status_code == 200
|
|
assert r2.json() == []
|