generated from coulomb/repo-seed
Replace the ad-hoc coordination-domain spine with the Repo Classification Standard: 14 market domains, classification columns on managed_repos, and workplans anchored by repo_id (topic_id optional). - Add Alembic migration d8e9f0a1b2c3 with data backfill and workstream→workplan rename - Add api/classification.py validation and register-from-classification tooling - Expose workplan-first REST/MCP surface with legacy workstream aliases - Add C-24 consistency rule and legacy domain frontmatter mapping - Update dashboard repos page with category/capability/stake filters - Update orientation docs; mark STATE-WP-0065 finished
183 lines
7.2 KiB
Python
183 lines
7.2 KiB
Python
"""
|
|
Token passthrough test: update_task_status creates a token event on done.
|
|
|
|
Three-tier logic:
|
|
Tier 1 — exact tokens_in/tokens_out provided
|
|
Tier 2 — workplan_tokens_in/out provided → prorated by task count (note="workplan")
|
|
Tier 3 — no token args, status=done → heuristic 1000/500 (note="heuristic")
|
|
|
|
Non-done status changes never create a token event.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
|
|
async def _create_domain(client, slug="td"):
|
|
r = await client.post("/domains/", json={"slug": slug, "name": "D"})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
async def _create_topic(client, domain_slug="td"):
|
|
r = await client.post("/topics/", json={"slug": "tp", "title": "T", "domain": domain_slug})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
from tests.conftest import create_test_repo, create_test_workplan
|
|
|
|
|
|
async def _create_workstream(client, topic_id, domain_slug="td"):
|
|
repo = await create_test_repo(client, domain_slug=domain_slug, slug="td-repo")
|
|
return await create_test_workplan(client, repo_id=repo["id"], topic_id=topic_id, slug="ws", title="WS")
|
|
|
|
|
|
async def _create_task(client, workplan_id, title="my task"):
|
|
r = await client.post("/tasks/", json={"workplan_id": workplan_id, "title": title})
|
|
assert r.status_code == 201, r.text
|
|
return r.json()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestTokenPassthrough:
|
|
async def test_tier1_exact_tokens(self, client):
|
|
"""Tier 1: exact tokens_in/tokens_out → used as-is, no note."""
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={
|
|
"status": "done",
|
|
"tokens_in": 1200,
|
|
"tokens_out": 800,
|
|
"model": "claude-sonnet-4-6",
|
|
"agent": "custodian",
|
|
})
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "done"
|
|
|
|
events = (await client.get("/token-events/", params={"task_id": task["id"]})).json()
|
|
assert len(events) == 1
|
|
ev = events[0]
|
|
assert ev["tokens_in"] == 1200
|
|
assert ev["tokens_out"] == 800
|
|
assert ev["tokens_total"] == 2000
|
|
assert ev["model"] == "claude-sonnet-4-6"
|
|
assert ev["agent"] == "custodian"
|
|
assert ev["workstream_id"] == ws["id"]
|
|
assert ev["note"] == "measured"
|
|
assert ev["measurement_kind"] == "measured"
|
|
assert ev["source_provider"] == "manual"
|
|
assert ev["source_id"] == f"task:{task['id']}:manual"
|
|
|
|
async def test_tier1_userbased_note_override(self, client):
|
|
"""Tier 1 with note='userbased' records that note instead of 'measured'."""
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={
|
|
"status": "done",
|
|
"tokens_in": 500,
|
|
"tokens_out": 200,
|
|
"token_note": "userbased",
|
|
})
|
|
assert r.status_code == 200
|
|
|
|
events = (await client.get("/token-events/", params={"task_id": task["id"]})).json()
|
|
assert events[0]["note"] == "userbased"
|
|
assert events[0]["measurement_kind"] == "measured"
|
|
|
|
async def test_tier2_workplan_prorated(self, client):
|
|
"""Tier 2: workplan totals prorated across 4 tasks → 250/125 each, note='workplan'."""
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
# Create 4 tasks; mark the first done with workplan totals
|
|
task = await _create_task(client, ws["id"], "T1")
|
|
for title in ["T2", "T3", "T4"]:
|
|
await _create_task(client, ws["id"], title)
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={
|
|
"status": "done",
|
|
"workplan_tokens_in": 1000,
|
|
"workplan_tokens_out": 500,
|
|
})
|
|
assert r.status_code == 200
|
|
|
|
events = (await client.get("/token-events/", params={"task_id": task["id"]})).json()
|
|
assert len(events) == 1
|
|
ev = events[0]
|
|
assert ev["tokens_in"] == 250 # 1000 // 4
|
|
assert ev["tokens_out"] == 125 # 500 // 4
|
|
assert ev["note"] == "workplan"
|
|
assert ev["measurement_kind"] == "allocated"
|
|
assert ev["raw_metadata"]["allocation_method"] == "workplan_prorated"
|
|
|
|
async def test_tier3_heuristic_fallback(self, client):
|
|
"""Tier 3: status=done with no token args → heuristic 1000/500, note='heuristic'."""
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={"status": "done"})
|
|
assert r.status_code == 200
|
|
|
|
events = (await client.get("/token-events/", params={"task_id": task["id"]})).json()
|
|
assert len(events) == 1
|
|
ev = events[0]
|
|
assert ev["tokens_in"] == 1000
|
|
assert ev["tokens_out"] == 500
|
|
assert ev["note"] == "heuristic"
|
|
assert ev["measurement_kind"] == "estimated"
|
|
assert ev["source_provider"] == "task_fallback"
|
|
|
|
async def test_suppress_token_event_skips_done_fallback(self, client):
|
|
"""File/cache sync can mark a task done without minting a heuristic event."""
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={
|
|
"status": "done",
|
|
"suppress_token_event": True,
|
|
})
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "done"
|
|
|
|
events = (await client.get("/token-events/", params={"task_id": task["id"]})).json()
|
|
assert events == []
|
|
|
|
async def test_repeated_done_update_does_not_duplicate_event(self, client):
|
|
"""Only the transition into done records token usage."""
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={"status": "done"})
|
|
assert r.status_code == 200
|
|
r = await client.patch(f"/tasks/{task['id']}", json={"status": "done"})
|
|
assert r.status_code == 200
|
|
|
|
events = (await client.get("/token-events/", params={"task_id": task["id"]})).json()
|
|
assert len(events) == 1
|
|
|
|
async def test_non_done_status_creates_no_event(self, client):
|
|
"""Non-done status updates never create a token event."""
|
|
await _create_domain(client)
|
|
topic = await _create_topic(client)
|
|
ws = await _create_workstream(client, topic["id"])
|
|
task = await _create_task(client, ws["id"])
|
|
|
|
r = await client.patch(f"/tasks/{task['id']}", json={"status": "progress"})
|
|
assert r.status_code == 200
|
|
|
|
events = (await client.get("/token-events/", params={"task_id": task["id"]})).json()
|
|
assert events == []
|