generated from coulomb/repo-seed
feat(token-tracking): repo aggregation via graph walk (task→workstream→repo)
By Repo now resolves via the full chain rather than requiring repo_id directly on the token event: 1. token_events.repo_id (direct) 2. → workstreams.repo_id (via workstream_id) 3. → task.workstream_id → workstreams.repo_id (via task_id) Changes: - Auto-populate repo_id on token events at creation time (both the token_events router and the tasks router) - New GET /token-events/by-repo/ endpoint with RepoTokenSummary schema; returns tokens_in/out/total, event_count, by_model, by_note per repo - Dashboard By Repo section uses /by-repo/ directly and shows repo_slug instead of a truncated UUID - Backfilled the three existing events (userbased) with repo_id via SQL 185 tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -8,6 +8,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from api.database import get_session
|
||||
from api.models.task import Task, TaskStatus
|
||||
from api.models.token_event import TokenEvent
|
||||
from api.models.workstream import Workstream
|
||||
from api.schemas.task import TaskCreate, TaskRead, TaskUpdate
|
||||
|
||||
router = APIRouter(prefix="/tasks", tags=["tasks"])
|
||||
@@ -104,9 +105,14 @@ async def update_task(
|
||||
# Tier 3: heuristic fallback
|
||||
tin, tout, tnote = 1000, 500, "heuristic"
|
||||
|
||||
# Resolve repo_id via workstream
|
||||
ws = await session.get(Workstream, task.workstream_id)
|
||||
repo_id = ws.repo_id if ws else None
|
||||
|
||||
event = TokenEvent(
|
||||
task_id=task_id,
|
||||
workstream_id=task.workstream_id,
|
||||
repo_id=repo_id,
|
||||
tokens_in=tin,
|
||||
tokens_out=tout,
|
||||
model=token_data.get("model"),
|
||||
|
||||
@@ -6,9 +6,11 @@ from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from api.database import get_session
|
||||
from api.models.managed_repo import ManagedRepo
|
||||
from api.models.task import Task
|
||||
from api.models.token_event import TokenEvent
|
||||
from api.schemas.token_event import TokenEventCreate, TokenEventRead, TokenSummary
|
||||
from api.models.workstream import Workstream
|
||||
from api.schemas.token_event import RepoTokenSummary, TokenEventCreate, TokenEventRead, TokenSummary
|
||||
|
||||
router = APIRouter(prefix="/token-events", tags=["token-events"])
|
||||
|
||||
@@ -26,6 +28,12 @@ async def create_token_event(
|
||||
if task:
|
||||
data["workstream_id"] = task.workstream_id
|
||||
|
||||
# Auto-populate repo_id from workstream if not provided
|
||||
if data.get("workstream_id") and not data.get("repo_id"):
|
||||
ws = await session.get(Workstream, data["workstream_id"])
|
||||
if ws and ws.repo_id:
|
||||
data["repo_id"] = ws.repo_id
|
||||
|
||||
event = TokenEvent(**data)
|
||||
session.add(event)
|
||||
await session.commit()
|
||||
@@ -90,6 +98,74 @@ async def get_token_summary(
|
||||
)
|
||||
|
||||
|
||||
@router.get("/by-repo/", response_model=list[RepoTokenSummary])
|
||||
async def get_tokens_by_repo(
|
||||
session: AsyncSession = Depends(get_session),
|
||||
) -> list[RepoTokenSummary]:
|
||||
"""Aggregate token consumption per repo, resolving via the full graph.
|
||||
|
||||
Resolution order for each event:
|
||||
1. token_events.repo_id (direct)
|
||||
2. → workstreams.repo_id (via workstream_id)
|
||||
3. → task.workstream_id → workstreams.repo_id (via task_id)
|
||||
|
||||
Only events that resolve to a repo are included.
|
||||
"""
|
||||
# Fetch all events, workstreams, repos in three queries (avoids N+1)
|
||||
events_result = await session.execute(select(TokenEvent))
|
||||
events = list(events_result.scalars().all())
|
||||
|
||||
ws_result = await session.execute(select(Workstream))
|
||||
ws_map: dict[uuid.UUID, Workstream] = {w.id: w for w in ws_result.scalars().all()}
|
||||
|
||||
task_result = await session.execute(select(Task))
|
||||
task_map: dict[uuid.UUID, Task] = {t.id: t for t in task_result.scalars().all()}
|
||||
|
||||
repo_result = await session.execute(select(ManagedRepo))
|
||||
repo_map: dict[uuid.UUID, ManagedRepo] = {r.id: r for r in repo_result.scalars().all()}
|
||||
|
||||
def resolve_repo_id(e: TokenEvent) -> uuid.UUID | None:
|
||||
if e.repo_id:
|
||||
return e.repo_id
|
||||
ws_id = e.workstream_id
|
||||
if not ws_id and e.task_id and e.task_id in task_map:
|
||||
ws_id = task_map[e.task_id].workstream_id
|
||||
if ws_id and ws_id in ws_map:
|
||||
return ws_map[ws_id].repo_id
|
||||
return None
|
||||
|
||||
groups: dict[uuid.UUID, dict] = {}
|
||||
for e in events:
|
||||
rid = resolve_repo_id(e)
|
||||
if not rid or rid not in repo_map:
|
||||
continue
|
||||
if rid not in groups:
|
||||
groups[rid] = {
|
||||
"repo_id": rid,
|
||||
"repo_slug": repo_map[rid].slug,
|
||||
"tokens_in": 0,
|
||||
"tokens_out": 0,
|
||||
"event_count": 0,
|
||||
"by_model": defaultdict(int),
|
||||
"by_note": defaultdict(int),
|
||||
}
|
||||
g = groups[rid]
|
||||
g["tokens_in"] += e.tokens_in
|
||||
g["tokens_out"] += e.tokens_out
|
||||
g["event_count"] += 1
|
||||
if e.model:
|
||||
g["by_model"][e.model] += e.tokens_in + e.tokens_out
|
||||
g["by_note"][e.note or "unknown"] += e.tokens_in + e.tokens_out
|
||||
|
||||
return [
|
||||
RepoTokenSummary(
|
||||
**{k: (dict(v) if isinstance(v, defaultdict) else v) for k, v in g.items()},
|
||||
tokens_total=g["tokens_in"] + g["tokens_out"],
|
||||
)
|
||||
for g in sorted(groups.values(), key=lambda x: -(x["tokens_in"] + x["tokens_out"]))
|
||||
]
|
||||
|
||||
|
||||
@router.get("/", response_model=list[TokenEventRead])
|
||||
async def list_token_events(
|
||||
task_id: uuid.UUID | None = None,
|
||||
|
||||
@@ -50,3 +50,14 @@ class TokenSummary(BaseModel):
|
||||
event_count: int
|
||||
by_model: dict[str, int]
|
||||
by_agent: dict[str, int]
|
||||
|
||||
|
||||
class RepoTokenSummary(BaseModel):
|
||||
repo_id: uuid.UUID
|
||||
repo_slug: str
|
||||
tokens_in: int
|
||||
tokens_out: int
|
||||
tokens_total: int
|
||||
event_count: int
|
||||
by_model: dict[str, int]
|
||||
by_note: dict[str, int]
|
||||
|
||||
Reference in New Issue
Block a user