generated from coulomb/repo-seed
348 lines
14 KiB
Python
348 lines
14 KiB
Python
"""
|
|
Token events router tests.
|
|
|
|
Covers: create event, list with filters, summary aggregation (single task,
|
|
cross-workstream rollup, by-model breakdown).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _create_domain(client, slug="testdomain"):
|
|
r = await client.post("/domains/", json={"slug": slug, "name": "Test Domain"})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_topic(client, domain_slug="testdomain"):
|
|
r = await client.post("/topics/", json={"slug": "testtopic", "title": "T", "domain": domain_slug})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_workstream(client, topic_id, slug="ws1"):
|
|
r = await client.post("/workstreams/", json={"topic_id": topic_id, "slug": slug, "title": "WS"})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_task(client, workstream_id):
|
|
r = await client.post("/tasks/", json={"workstream_id": workstream_id, "title": "task"})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _post_event(client, tokens_in=100, tokens_out=50, **kwargs):
|
|
body = {"tokens_in": tokens_in, "tokens_out": tokens_out, **kwargs}
|
|
r = await client.post("/token-events/", json=body)
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
class TestTokenEventsCreate:
|
|
async def test_create_minimal(self, client):
|
|
ev = await _post_event(client, tokens_in=200, tokens_out=100)
|
|
assert ev["tokens_in"] == 200
|
|
assert ev["tokens_out"] == 100
|
|
assert ev["tokens_total"] == 300
|
|
assert ev["measurement_kind"] == "estimated"
|
|
assert ev["source_provider"] == "manual"
|
|
assert ev["raw_total_tokens"] == 300
|
|
assert ev["id"] is not None
|
|
|
|
async def test_create_with_created_at_backfill_timestamp(self, client):
|
|
created_at = "2026-05-19T01:02:03Z"
|
|
ev = await _post_event(
|
|
client,
|
|
tokens_in=200,
|
|
tokens_out=100,
|
|
ref_type="session",
|
|
ref_id="codex:test-session",
|
|
created_at=created_at,
|
|
)
|
|
assert ev["created_at"].startswith("2026-05-19T01:02:03")
|
|
|
|
async def test_create_with_all_fields(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
ev = await _post_event(
|
|
client,
|
|
tokens_in=1000,
|
|
tokens_out=500,
|
|
task_id=task["id"],
|
|
model="claude-sonnet-4-6",
|
|
agent="custodian",
|
|
ref_type="task",
|
|
ref_id=task["id"],
|
|
note="T01 done",
|
|
session_id="ses-abc",
|
|
measurement_kind="measured",
|
|
source_provider="manual",
|
|
source_id="manual:test-event",
|
|
confidence=0.95,
|
|
cached_input_tokens=10,
|
|
reasoning_output_tokens=20,
|
|
raw_total_tokens=1530,
|
|
raw_metadata={"source": "unit-test"},
|
|
)
|
|
assert ev["task_id"] == task["id"]
|
|
assert ev["workstream_id"] == ws["id"] # auto-populated from task
|
|
assert ev["model"] == "claude-sonnet-4-6"
|
|
assert ev["tokens_total"] == 1500
|
|
assert ev["measurement_kind"] == "measured"
|
|
assert ev["source_provider"] == "manual"
|
|
assert ev["source_id"] == "manual:test-event"
|
|
assert ev["cached_input_tokens"] == 10
|
|
assert ev["reasoning_output_tokens"] == 20
|
|
assert ev["token_evidence_total"] == 1530
|
|
assert ev["raw_metadata"] == {"source": "unit-test"}
|
|
|
|
async def test_upsert_source_event_updates_existing_session(self, client):
|
|
body = {
|
|
"tokens_in": 100,
|
|
"tokens_out": 50,
|
|
"measurement_kind": "measured",
|
|
"source_provider": "codex_session",
|
|
"source_id": "codex:abc",
|
|
"ref_type": "session",
|
|
"ref_id": "codex:abc",
|
|
"session_id": "abc",
|
|
"cached_input_tokens": 5,
|
|
}
|
|
first = await client.post("/token-events/upsert", json=body)
|
|
assert first.status_code == 200, first.text
|
|
second = await client.post("/token-events/upsert", json={**body, "tokens_in": 300, "tokens_out": 80})
|
|
assert second.status_code == 200, second.text
|
|
assert first.json()["id"] == second.json()["id"]
|
|
assert second.json()["tokens_total"] == 380
|
|
|
|
listed = (await client.get("/token-events/", params={"source_provider": "codex_session"})).json()
|
|
assert len(listed) == 1
|
|
|
|
async def test_patch_backfill_fields(self, client):
|
|
ev = await _post_event(client, tokens_in=100, tokens_out=50)
|
|
|
|
r = await client.patch(f"/token-events/{ev['id']}", json={
|
|
"tokens_in": 500,
|
|
"tokens_out": 250,
|
|
"session_id": "codex-session",
|
|
"ref_type": "session",
|
|
"ref_id": "codex:session",
|
|
"created_at": "2026-05-20T01:02:03Z",
|
|
"note": "backfill:codex-session",
|
|
"measurement_kind": "measured",
|
|
"source_provider": "codex_session",
|
|
"source_id": "codex:session",
|
|
"cached_input_tokens": 10,
|
|
})
|
|
assert r.status_code == 200
|
|
patched = r.json()
|
|
assert patched["tokens_total"] == 750
|
|
assert patched["session_id"] == "codex-session"
|
|
assert patched["ref_type"] == "session"
|
|
assert patched["ref_id"] == "codex:session"
|
|
assert patched["created_at"].startswith("2026-05-20T01:02:03")
|
|
assert patched["measurement_kind"] == "measured"
|
|
assert patched["source_provider"] == "codex_session"
|
|
assert patched["source_id"] == "codex:session"
|
|
assert patched["cached_input_tokens"] == 10
|
|
|
|
async def test_workstream_auto_populated_from_task(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
ev = await _post_event(client, task_id=task["id"])
|
|
assert ev["workstream_id"] == ws["id"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestTokenEventsList:
|
|
async def test_list_empty(self, client):
|
|
r = await client.get("/token-events/")
|
|
assert r.status_code == 200
|
|
assert r.json() == []
|
|
|
|
async def test_list_returns_events(self, client):
|
|
await _post_event(client, tokens_in=100, tokens_out=50)
|
|
await _post_event(client, tokens_in=200, tokens_out=100)
|
|
r = await client.get("/token-events/")
|
|
assert r.status_code == 200
|
|
assert len(r.json()) == 2
|
|
|
|
async def test_filter_by_task_id(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
await _post_event(client, task_id=task["id"], tokens_in=100, tokens_out=50)
|
|
await _post_event(client, tokens_in=200, tokens_out=100) # unrelated
|
|
|
|
r = await client.get("/token-events/", params={"task_id": task["id"]})
|
|
assert r.status_code == 200
|
|
events = r.json()
|
|
assert len(events) == 1
|
|
assert events[0]["task_id"] == task["id"]
|
|
|
|
async def test_filter_by_model(self, client):
|
|
await _post_event(client, model="claude-sonnet-4-6", tokens_in=100, tokens_out=50)
|
|
await _post_event(client, model="claude-opus-4-6", tokens_in=200, tokens_out=100)
|
|
|
|
r = await client.get("/token-events/", params={"model": "claude-sonnet-4-6"})
|
|
assert r.status_code == 200
|
|
events = r.json()
|
|
assert len(events) == 1
|
|
assert events[0]["model"] == "claude-sonnet-4-6"
|
|
|
|
async def test_filter_by_measurement_kind_and_source_provider(self, client):
|
|
await _post_event(
|
|
client,
|
|
tokens_in=100,
|
|
tokens_out=50,
|
|
measurement_kind="measured",
|
|
source_provider="codex_session",
|
|
source_id="codex:filter",
|
|
)
|
|
await _post_event(client, tokens_in=200, tokens_out=100, note="heuristic")
|
|
|
|
r = await client.get(
|
|
"/token-events/",
|
|
params={"measurement_kind": "measured", "source_provider": "codex_session"},
|
|
)
|
|
assert r.status_code == 200
|
|
events = r.json()
|
|
assert len(events) == 1
|
|
assert events[0]["source_id"] == "codex:filter"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestTokenSummary:
|
|
async def test_summary_single_task(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
await _post_event(client, task_id=task["id"], tokens_in=500, tokens_out=300, model="model-a")
|
|
await _post_event(client, task_id=task["id"], tokens_in=100, tokens_out=50, model="model-a")
|
|
|
|
r = await client.get("/token-events/summary/", params={"scope": "task", "id": task["id"]})
|
|
assert r.status_code == 200
|
|
s = r.json()
|
|
assert s["scope"] == "task"
|
|
assert s["tokens_in"] == 600
|
|
assert s["tokens_out"] == 350
|
|
assert s["tokens_total"] == 950
|
|
assert s["event_count"] == 2
|
|
assert "model-a" in s["by_model"]
|
|
assert s["by_model"]["model-a"] == 950
|
|
|
|
async def test_summary_workstream_rollup(self, client):
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task1 = await _create_task(client, ws["id"])
|
|
task2 = await _create_task(client, ws["id"])
|
|
|
|
await _post_event(client, task_id=task1["id"], tokens_in=1000, tokens_out=500)
|
|
await _post_event(client, task_id=task2["id"], workstream_id=ws["id"], tokens_in=200, tokens_out=100)
|
|
|
|
r = await client.get("/token-events/summary/", params={"scope": "workstream", "id": ws["id"]})
|
|
assert r.status_code == 200
|
|
s = r.json()
|
|
# task1 auto-populates workstream_id; task2 explicitly sets it
|
|
assert s["tokens_total"] >= 1800
|
|
|
|
async def test_summary_by_model_breakdown(self, client):
|
|
await _post_event(client, model="sonnet", tokens_in=300, tokens_out=200, agent="custodian")
|
|
await _post_event(client, model="opus", tokens_in=100, tokens_out=50, agent="ralph")
|
|
await _post_event(client, model="sonnet", tokens_in=200, tokens_out=100)
|
|
|
|
# Use workstream_id scope via events directly tagged with workstream
|
|
# Instead, just check the ref_type/ref_id scope path
|
|
await _post_event(
|
|
client, model="sonnet", tokens_in=50, tokens_out=25,
|
|
ref_type="session", ref_id="ses-001",
|
|
)
|
|
r = await client.get("/token-events/summary/", params={"scope": "session", "id": "ses-001"})
|
|
assert r.status_code == 200
|
|
s = r.json()
|
|
assert s["event_count"] == 1
|
|
assert s["tokens_total"] == 75
|
|
assert s["by_measurement_kind"]["estimated"] == 75
|
|
|
|
async def test_summary_unknown_scope_returns_422(self, client):
|
|
r = await client.get("/token-events/summary/", params={"scope": "foobar", "id": "x"})
|
|
assert r.status_code == 422
|
|
|
|
async def test_summary_empty_scope_returns_zeros(self, client):
|
|
import uuid
|
|
r = await client.get("/token-events/summary/", params={"scope": "task", "id": str(uuid.uuid4())})
|
|
assert r.status_code == 200
|
|
s = r.json()
|
|
assert s["tokens_total"] == 0
|
|
assert s["event_count"] == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestTokenEventGetById:
|
|
async def test_get_by_id(self, client):
|
|
ev = await _post_event(client, tokens_in=111, tokens_out=222, note="get-by-id-test")
|
|
r = await client.get(f"/token-events/{ev['id']}")
|
|
assert r.status_code == 200
|
|
result = r.json()
|
|
assert result["id"] == ev["id"]
|
|
assert result["tokens_in"] == 111
|
|
assert result["tokens_out"] == 222
|
|
assert result["tokens_total"] == 333
|
|
assert result["note"] == "get-by-id-test"
|
|
|
|
async def test_get_by_id_not_found(self, client):
|
|
import uuid
|
|
r = await client.get(f"/token-events/{uuid.uuid4()}")
|
|
assert r.status_code == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestTokenAggregateAndQuality:
|
|
async def test_aggregate_and_quality_expose_evidence_breakdown(self, client):
|
|
await _post_event(
|
|
client,
|
|
tokens_in=100,
|
|
tokens_out=50,
|
|
measurement_kind="measured",
|
|
source_provider="codex_session",
|
|
source_id="codex:agg",
|
|
)
|
|
await _post_event(client, tokens_in=1000, tokens_out=500, note="heuristic")
|
|
|
|
agg = (await client.get("/token-events/aggregate/", params={"include_superseded": "false"})).json()
|
|
assert agg["tokens_total"] == 1650
|
|
assert agg["by_measurement_kind"]["measured"] == 150
|
|
assert agg["by_measurement_kind"]["estimated"] == 1500
|
|
assert agg["by_source_provider"]["codex_session"] == 150
|
|
assert agg["by_source_provider"]["task_fallback"] == 1500
|
|
|
|
measured = (await client.get("/token-events/aggregate/", params={"measurement_kind": "measured"})).json()
|
|
assert measured["tokens_total"] == 150
|
|
|
|
quality = (await client.get("/token-events/quality/")).json()
|
|
assert quality["measured_event_count"] == 1
|
|
assert quality["fallback_event_count"] == 1
|
|
assert quality["missing_provenance_event_count"] == 0
|