diff --git a/api/main.py b/api/main.py index 8f277ff..92fbc79 100644 --- a/api/main.py +++ b/api/main.py @@ -17,6 +17,7 @@ from api.routers import interface_changes from api.routers import flows from api.routers import recently_on_scope from api.routers import reconciliation +from api.routers import execution class ETagMiddleware(BaseHTTPMiddleware): @@ -102,6 +103,7 @@ app.include_router(token_events.router) app.include_router(interface_changes.router) app.include_router(flows.router) app.include_router(reconciliation.router) +app.include_router(execution.router) app.include_router(state.router) app.include_router(policy.router) diff --git a/api/models/__init__.py b/api/models/__init__.py index c3377d0..5bf0fb7 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -21,6 +21,7 @@ from api.models.tpsc import TPSCCatalog, TPSCSnapshot, TPSCEntry from api.models.doi_cache import DOICache from api.models.token_event import TokenEvent from api.models.interface_change import InterfaceChange +from api.models.workplan_launch_request import WorkplanLaunchRequest __all__ = [ "Base", @@ -46,4 +47,5 @@ __all__ = [ "DOICache", "TokenEvent", "InterfaceChange", + "WorkplanLaunchRequest", ] diff --git a/api/models/workplan_launch_request.py b/api/models/workplan_launch_request.py new file mode 100644 index 0000000..72b770e --- /dev/null +++ b/api/models/workplan_launch_request.py @@ -0,0 +1,39 @@ +import uuid + +from sqlalchemy import Boolean, ForeignKey, String, Text +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from api.models.base import Base, TimestampMixin, new_uuid + + +class WorkplanLaunchRequest(Base, TimestampMixin): + __tablename__ = "workplan_launch_requests" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=new_uuid + ) + workstream_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("workstreams.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + requested_by: Mapped[str] = mapped_column(String(100), nullable=False, default="dashboard") + requested_actor: Mapped[str | None] = mapped_column(String(100), nullable=True) + launch_mode: Mapped[str] = mapped_column(String(20), nullable=False, default="queued", index=True) + concurrency_mode: Mapped[str] = mapped_column(String(20), nullable=False, default="sequential", index=True) + priority: Mapped[str | None] = mapped_column(String(20), nullable=True, index=True) + repo_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), + ForeignKey("managed_repos.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) + branch_preference: Mapped[str | None] = mapped_column(Text, nullable=True) + immediate_pickup: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False, server_default="false") + status: Mapped[str] = mapped_column(String(20), nullable=False, default="requested", server_default="requested", index=True) + notes: Mapped[str | None] = mapped_column(Text, nullable=True) + request_metadata: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict, server_default="{}") + + workstream: Mapped["Workstream"] = relationship("Workstream", back_populates="launch_requests") # noqa: F821 diff --git a/api/models/workstream.py b/api/models/workstream.py index 9b3223b..bf38be1 100644 --- a/api/models/workstream.py +++ b/api/models/workstream.py @@ -1,7 +1,7 @@ import uuid -from datetime import date +from datetime import date, datetime -from sqlalchemy import Date, ForeignKey, Integer, String, Text +from sqlalchemy import Date, DateTime, ForeignKey, Integer, String, Text from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import Mapped, mapped_column, relationship @@ -27,6 +27,18 @@ class Workstream(Base, TimestampMixin): due_date: Mapped[date | None] = mapped_column(Date, nullable=True) planning_priority: Mapped[str | None] = mapped_column(String(20), nullable=True, index=True) planning_order: Mapped[int | None] = mapped_column(Integer, nullable=True, index=True) + execution_state: Mapped[str] = mapped_column( + String(20), nullable=False, default="manual", server_default="manual", index=True + ) + launch_mode: Mapped[str] = mapped_column( + String(20), nullable=False, default="manual", server_default="manual", index=True + ) + concurrency_mode: Mapped[str] = mapped_column( + String(20), nullable=False, default="sequential", server_default="sequential", index=True + ) + queue_rank: Mapped[int | None] = mapped_column(Integer, nullable=True, index=True) + execution_group: Mapped[str | None] = mapped_column(String(100), nullable=True, index=True) + scheduled_for: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True) repo_id: Mapped[uuid.UUID | None] = mapped_column( UUID(as_uuid=True), @@ -53,3 +65,6 @@ class Workstream(Base, TimestampMixin): progress_events: Mapped[list["ProgressEvent"]] = relationship( # noqa: F821 "ProgressEvent", back_populates="workstream", lazy="selectin" ) + launch_requests: Mapped[list["WorkplanLaunchRequest"]] = relationship( # noqa: F821 + "WorkplanLaunchRequest", back_populates="workstream", lazy="selectin" + ) diff --git a/api/routers/execution.py b/api/routers/execution.py new file mode 100644 index 0000000..28bb363 --- /dev/null +++ b/api/routers/execution.py @@ -0,0 +1,196 @@ +import uuid + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from api.database import get_session +from api.models.task import Task, TaskStatus +from api.models.workplan_launch_request import WorkplanLaunchRequest +from api.models.workstream import Workstream +from api.models.workstream_dependency import WorkstreamDependency +from api.schemas.execution import ( + ExecutionIntentRead, + ExecutionIntentUpdate, + ExecutionSemantics, + LaunchRequestCreate, + LaunchRequestRead, + WorkplanQueueItem, +) +from api.services.execution_queue import ( + ACTIVITY_CORE_RESPONSIBILITIES, + CONCURRENCY_MODES, + EXECUTION_STATES, + LAUNCH_MODES, + STATE_HUB_RESPONSIBILITIES, + execution_state_for_launch, + queue_sort_key, + workstream_blockers, +) +from api.workplan_status import CLOSED_WORKSTREAM_STATUSES, normalize_workstream_status + +router = APIRouter(prefix="/execution", tags=["execution"]) + + +@router.get("/semantics", response_model=ExecutionSemantics) +async def execution_semantics() -> ExecutionSemantics: + return ExecutionSemantics( + execution_states=EXECUTION_STATES, + launch_modes=LAUNCH_MODES, + concurrency_modes=CONCURRENCY_MODES, + state_hub_responsibility=STATE_HUB_RESPONSIBILITIES, + activity_core_responsibility=ACTIVITY_CORE_RESPONSIBILITIES, + ) + + +@router.patch("/workstreams/{workstream_id}/intent", response_model=ExecutionIntentRead) +async def update_execution_intent( + workstream_id: uuid.UUID, + body: ExecutionIntentUpdate, + session: AsyncSession = Depends(get_session), +) -> ExecutionIntentRead: + ws = await session.get(Workstream, workstream_id) + if ws is None: + raise HTTPException(status_code=404, detail="Workstream not found") + + for field, value in body.model_dump(exclude_unset=True).items(): + setattr(ws, field, value) + await session.commit() + await session.refresh(ws) + return _intent_read(ws) + + +@router.get("/workplan-stack", response_model=list[WorkplanQueueItem]) +async def workplan_stack( + include_manual: bool = Query(True), + include_blocked: bool = Query(True), + session: AsyncSession = Depends(get_session), +) -> list[WorkplanQueueItem]: + result = await session.execute(select(Workstream)) + workstreams = [ + ws for ws in result.scalars().all() + if normalize_workstream_status(ws.status) not in CLOSED_WORKSTREAM_STATUSES + ] + ws_by_id = {ws.id: ws for ws in workstreams} + ws_status = {ws.id: normalize_workstream_status(ws.status) for ws in workstreams} + + dep_result = await session.execute(select(WorkstreamDependency)) + ws_deps: dict[uuid.UUID, list[uuid.UUID]] = {} + task_deps: dict[uuid.UUID, list[uuid.UUID]] = {} + for dep in dep_result.scalars().all(): + if dep.to_workstream_id is not None: + ws_deps.setdefault(dep.from_workstream_id, []).append(dep.to_workstream_id) + if dep.to_task_id is not None: + task_deps.setdefault(dep.from_workstream_id, []).append(dep.to_task_id) + + task_ids = [task_id for ids in task_deps.values() for task_id in ids] + task_status: dict[uuid.UUID, str] = {} + if task_ids: + task_result = await session.execute(select(Task).where(Task.id.in_(task_ids))) + task_status = {task.id: _task_status(task.status) for task in task_result.scalars().all()} + + items: list[WorkplanQueueItem] = [] + for ws in workstreams: + if not include_manual and ws.execution_state == "manual": + continue + lifecycle_status = normalize_workstream_status(ws.status) + blocked_ws = [ + blocker for blocker in workstream_blockers(ws.id, ws_deps, ws_status) + if blocker in ws_by_id or blocker in ws_status + ] + blocked_tasks = [ + task_id for task_id in task_deps.get(ws.id, []) + if task_status.get(task_id) not in {"done", "cancelled"} + ] + eligible = lifecycle_status != "blocked" and not blocked_ws and not blocked_tasks + if not include_blocked and not eligible: + continue + sort_key = queue_sort_key(ws, eligible=eligible) + items.append(WorkplanQueueItem( + workstream_id=ws.id, + slug=ws.slug, + title=ws.title, + status=lifecycle_status, + repo_id=ws.repo_id, + planning_priority=ws.planning_priority, + planning_order=ws.planning_order, + execution_state=ws.execution_state, + launch_mode=ws.launch_mode, + concurrency_mode=ws.concurrency_mode, + queue_rank=ws.queue_rank, + execution_group=ws.execution_group, + scheduled_for=ws.scheduled_for, + eligible=eligible, + blocked_by_workstream_ids=blocked_ws, + blocked_by_task_ids=blocked_tasks, + sort_key=sort_key, + )) + return sorted(items, key=lambda item: item.sort_key) + + +@router.post( + "/launch-requests", + response_model=LaunchRequestRead, + status_code=status.HTTP_201_CREATED, +) +async def create_launch_request( + body: LaunchRequestCreate, + session: AsyncSession = Depends(get_session), +) -> WorkplanLaunchRequest: + ws = await session.get(Workstream, body.workstream_id) + if ws is None: + raise HTTPException(status_code=404, detail="Workstream not found") + + launch_request = WorkplanLaunchRequest( + workstream_id=ws.id, + requested_by=body.requested_by, + requested_actor=body.requested_actor, + launch_mode=body.launch_mode, + concurrency_mode=body.concurrency_mode, + priority=body.priority or ws.planning_priority, + repo_id=body.repo_id or ws.repo_id, + branch_preference=body.branch_preference, + immediate_pickup=body.immediate_pickup, + notes=body.notes, + request_metadata=body.request_metadata, + ) + ws.launch_mode = body.launch_mode + ws.concurrency_mode = body.concurrency_mode + ws.execution_state = execution_state_for_launch(body.launch_mode, body.immediate_pickup) + session.add(launch_request) + await session.commit() + await session.refresh(launch_request) + return launch_request + + +@router.get("/launch-requests", response_model=list[LaunchRequestRead]) +async def list_launch_requests( + workstream_id: uuid.UUID | None = None, + request_status: str | None = None, + session: AsyncSession = Depends(get_session), +) -> list[WorkplanLaunchRequest]: + q = select(WorkplanLaunchRequest).order_by(WorkplanLaunchRequest.created_at.desc()) + if workstream_id: + q = q.where(WorkplanLaunchRequest.workstream_id == workstream_id) + if request_status: + q = q.where(WorkplanLaunchRequest.status == request_status) + result = await session.execute(q) + return list(result.scalars().all()) + + +def _intent_read(ws: Workstream) -> ExecutionIntentRead: + return ExecutionIntentRead( + workstream_id=ws.id, + execution_state=ws.execution_state, + launch_mode=ws.launch_mode, + concurrency_mode=ws.concurrency_mode, + queue_rank=ws.queue_rank, + execution_group=ws.execution_group, + scheduled_for=ws.scheduled_for, + ) + + +def _task_status(status_value: TaskStatus | str) -> str: + if hasattr(status_value, "value"): + return status_value.value + return str(status_value or "").strip().lower() diff --git a/api/schemas/execution.py b/api/schemas/execution.py new file mode 100644 index 0000000..60799e6 --- /dev/null +++ b/api/schemas/execution.py @@ -0,0 +1,91 @@ +import uuid +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field + + +ExecutionState = Literal["manual", "queued", "scheduled", "launching", "paused", "completed", "cancelled"] +LaunchMode = Literal["manual", "queued", "scheduled", "immediate"] +ConcurrencyMode = Literal["sequential", "parallel"] +LaunchRequestStatus = Literal["requested", "accepted", "completed", "cancelled"] + + +class ExecutionIntentUpdate(BaseModel): + execution_state: ExecutionState | None = None + launch_mode: LaunchMode | None = None + concurrency_mode: ConcurrencyMode | None = None + queue_rank: int | None = None + execution_group: str | None = None + scheduled_for: datetime | None = None + + +class ExecutionIntentRead(BaseModel): + workstream_id: uuid.UUID + execution_state: ExecutionState + launch_mode: LaunchMode + concurrency_mode: ConcurrencyMode + queue_rank: int | None = None + execution_group: str | None = None + scheduled_for: datetime | None = None + + +class WorkplanQueueItem(BaseModel): + workstream_id: uuid.UUID + slug: str + title: str + status: str + repo_id: uuid.UUID | None = None + planning_priority: str | None = None + planning_order: int | None = None + execution_state: ExecutionState + launch_mode: LaunchMode + concurrency_mode: ConcurrencyMode + queue_rank: int | None = None + execution_group: str | None = None + scheduled_for: datetime | None = None + eligible: bool + blocked_by_workstream_ids: list[uuid.UUID] = Field(default_factory=list) + blocked_by_task_ids: list[uuid.UUID] = Field(default_factory=list) + sort_key: list[str | int] = Field(default_factory=list) + + +class LaunchRequestCreate(BaseModel): + workstream_id: uuid.UUID + requested_by: str = "dashboard" + requested_actor: str | None = None + launch_mode: LaunchMode = "queued" + concurrency_mode: ConcurrencyMode = "sequential" + priority: str | None = None + repo_id: uuid.UUID | None = None + branch_preference: str | None = None + immediate_pickup: bool = False + notes: str | None = None + request_metadata: dict = Field(default_factory=dict) + + +class LaunchRequestRead(BaseModel): + model_config = ConfigDict(from_attributes=True) + id: uuid.UUID + workstream_id: uuid.UUID + requested_by: str + requested_actor: str | None = None + launch_mode: LaunchMode + concurrency_mode: ConcurrencyMode + priority: str | None = None + repo_id: uuid.UUID | None = None + branch_preference: str | None = None + immediate_pickup: bool + status: LaunchRequestStatus + notes: str | None = None + request_metadata: dict = Field(default_factory=dict) + created_at: datetime + updated_at: datetime + + +class ExecutionSemantics(BaseModel): + execution_states: dict[str, str] + launch_modes: dict[str, str] + concurrency_modes: dict[str, str] + state_hub_responsibility: list[str] + activity_core_responsibility: list[str] diff --git a/api/schemas/workstream.py b/api/schemas/workstream.py index 29fb614..b7e8a0f 100644 --- a/api/schemas/workstream.py +++ b/api/schemas/workstream.py @@ -16,6 +16,9 @@ WorkstreamStatus = Literal[ "finished", "archived", ] +ExecutionState = Literal["manual", "queued", "scheduled", "launching", "paused", "completed", "cancelled"] +LaunchMode = Literal["manual", "queued", "scheduled", "immediate"] +ConcurrencyMode = Literal["sequential", "parallel"] class WorkstreamStatusMixin(BaseModel): @@ -35,6 +38,12 @@ class WorkstreamCreate(WorkstreamStatusMixin): due_date: date | None = None planning_priority: str | None = None planning_order: int | None = None + execution_state: ExecutionState = "manual" + launch_mode: LaunchMode = "manual" + concurrency_mode: ConcurrencyMode = "sequential" + queue_rank: int | None = None + execution_group: str | None = None + scheduled_for: datetime | None = None repo_id: uuid.UUID | None = None # GEMS primary: the owning repository repo_goal_id: uuid.UUID | None = None @@ -47,6 +56,12 @@ class WorkstreamUpdate(WorkstreamStatusMixin): due_date: date | None = None planning_priority: str | None = None planning_order: int | None = None + execution_state: ExecutionState | None = None + launch_mode: LaunchMode | None = None + concurrency_mode: ConcurrencyMode | None = None + queue_rank: int | None = None + execution_group: str | None = None + scheduled_for: datetime | None = None repo_id: uuid.UUID | None = None repo_goal_id: uuid.UUID | None = None @@ -65,6 +80,12 @@ class WorkstreamRead(WorkstreamStatusMixin): due_date: date | None = None planning_priority: str | None = None planning_order: int | None = None + execution_state: ExecutionState = "manual" + launch_mode: LaunchMode = "manual" + concurrency_mode: ConcurrencyMode = "sequential" + queue_rank: int | None = None + execution_group: str | None = None + scheduled_for: datetime | None = None created_at: datetime updated_at: datetime diff --git a/api/services/execution_queue.py b/api/services/execution_queue.py new file mode 100644 index 0000000..07f86ff --- /dev/null +++ b/api/services/execution_queue.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from typing import Any + +from api.workplan_status import normalize_workstream_status + + +EXECUTION_STATES = { + "manual": "Not queued for autonomous pickup; humans or agents may still work manually.", + "queued": "Candidate for ordered pickup when dependencies and concurrency allow it.", + "scheduled": "Waiting for an external launch window; State Hub stores the requested time.", + "launching": "A launch request asks for immediate pickup or has been handed off.", + "paused": "Temporarily held outside the pickup stack.", + "completed": "Execution intent is closed; lifecycle status remains authoritative.", + "cancelled": "Execution intent was cancelled without changing lifecycle status.", +} + +LAUNCH_MODES = { + "manual": "Do not request automation; keep intent visible only.", + "queued": "Place in the prioritized stack for later pickup.", + "scheduled": "Request pickup at or after a selected time.", + "immediate": "Request prompt activity-core or agent pickup.", +} + +CONCURRENCY_MODES = { + "sequential": "Respect queue order and avoid parallel pickup for the same group.", + "parallel": "Eligible for concurrent pickup with other ready work.", +} + +STATE_HUB_RESPONSIBILITIES = [ + "store lifecycle status separately from execution intent", + "rank candidate workplans and expose dependency-aware eligibility", + "record launch requests and handoff metadata durably", + "surface manual, queued, scheduled, and immediate intent to operators", +] + +ACTIVITY_CORE_RESPONSIBILITIES = [ + "own schedules, wakeups, and recurring automation", + "dispatch coding agents and coordinate parallel execution", + "acknowledge, run, and complete launch requests when available", +] + +EXECUTION_STATE_RANK = { + "launching": 0, + "queued": 1, + "scheduled": 2, + "manual": 3, + "paused": 4, + "completed": 5, + "cancelled": 6, +} + +PRIORITY_RANK = { + "critical": 0, + "high": 1, + "medium": 2, + "low": 3, +} + +CLOSED_WORKSTREAM_STATUSES = {"finished", "archived"} + + +def execution_state_for_launch(launch_mode: str, immediate_pickup: bool = False) -> str: + mode = (launch_mode or "queued").strip().lower() + if immediate_pickup or mode == "immediate": + return "launching" + if mode == "scheduled": + return "scheduled" + if mode == "manual": + return "manual" + return "queued" + + +def workstream_blockers( + workstream_id: Any, + dependency_targets: dict[Any, list[Any]], + workstream_status: dict[Any, str], +) -> list[Any]: + blockers = [] + for target_id in dependency_targets.get(workstream_id, []): + target_status = normalize_workstream_status(workstream_status.get(target_id)) + if target_status not in CLOSED_WORKSTREAM_STATUSES: + blockers.append(target_id) + return blockers + + +def queue_sort_key(workstream: Any, *, eligible: bool) -> list[int | str]: + priority = str(getattr(workstream, "planning_priority", "") or "").strip().lower() + execution_state = str(getattr(workstream, "execution_state", "") or "manual").strip().lower() + return [ + 0 if eligible else 1, + EXECUTION_STATE_RANK.get(execution_state, 99), + PRIORITY_RANK.get(priority, 50), + getattr(workstream, "queue_rank", None) if getattr(workstream, "queue_rank", None) is not None else 999_999, + getattr(workstream, "planning_order", None) if getattr(workstream, "planning_order", None) is not None else 999_999, + str(getattr(workstream, "slug", "") or ""), + ] diff --git a/dashboard/src/workplan-queue.md b/dashboard/src/workplan-queue.md new file mode 100644 index 0000000..b3af3cf --- /dev/null +++ b/dashboard/src/workplan-queue.md @@ -0,0 +1,291 @@ +--- +title: Workplan Queue +--- + +```js +import {apiFetch, waitForVisible, pollDelay, POLL_HEAVY} from "./components/config.js"; +``` + +```js +const queueState = (async function*() { + let failures = 0; + while (true) { + let stack = [], semantics = {}, ok = false; + try { + const [stackResponse, semanticsResponse] = await Promise.all([ + apiFetch("/execution/workplan-stack"), + apiFetch("/execution/semantics"), + ]); + ok = stackResponse.ok && semanticsResponse.ok; + if (ok) { + [stack, semantics] = await Promise.all([ + stackResponse.json(), + semanticsResponse.json(), + ]); + } + } catch {} + failures = ok ? 0 : failures + 1; + yield {stack, semantics, ok, ts: new Date()}; + await waitForVisible(pollDelay({ok, base: POLL_HEAVY, failures})); + } +})(); +``` + +```js +const stack = queueState.stack ?? []; +const semantics = queueState.semantics ?? {}; +const _ok = queueState.ok ?? false; +const _ts = queueState.ts; +``` + +# Workplan Queue + +```js +display(html`
No queue candidates.
`); +} else { + display(html`| State | +Rank | +Workplan | +Lifecycle | +Priority | +Eligibility | +Blocked By | +Intent | +
|---|---|---|---|---|---|---|---|
| ${row.execution_state} | +${row.queue_rank ?? row.planning_order ?? "—"} | +${row.slug} ${row.title} |
+ ${row.status} | +${row.planning_priority ?? "—"} | +${statusCell(row)} | +${blockers(row)} | +${queueControls(row)} | +