diff --git a/api/routers/state.py b/api/routers/state.py index 5c18ec6..01a8a10 100644 --- a/api/routers/state.py +++ b/api/routers/state.py @@ -4,6 +4,7 @@ from fastapi import APIRouter, Depends from fastapi.responses import JSONResponse from sqlalchemy import func, select, text from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import noload, selectinload from api.database import get_session, engine from api.flow_defs import assertion_result_to_dict, load_flow @@ -33,7 +34,7 @@ from api.schemas.state import ( WorkstreamTotals, ) from api.schemas.task import TaskRead -from api.schemas.topic import TopicWithWorkstreams +from api.schemas.topic import TopicRead, TopicWithWorkstreams from api.schemas.workstream import WorkstreamRead, WorkstreamWithTaskCounts, WorkstreamWithDeps from api.schemas.workstream_dependency import WorkstreamDepStub from task_flow_engine import FlowEngine @@ -47,9 +48,43 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm # AsyncSession does not support concurrent operations (no gather on same session). topics_rows = await session.execute( - select(Topic).where(Topic.status != TopicStatus.archived).order_by(Topic.created_at) + select(Topic) + .options( + selectinload(Topic.domain), + noload(Topic.workstreams), + noload(Topic.decisions), + noload(Topic.progress_events), + ) + .where(Topic.status != TopicStatus.archived) + .order_by(Topic.created_at) ) topics = list(topics_rows.scalars().all()) + topic_ids = [t.id for t in topics] + + topic_workstreams: dict = {t.id: [] for t in topics} + if topic_ids: + topic_ws_rows = await session.execute( + select( + Workstream.topic_id, + Workstream.id, + Workstream.slug, + Workstream.title, + Workstream.status, + Workstream.owner, + Workstream.due_date, + ) + .where(Workstream.topic_id.in_(topic_ids)) + .order_by(Workstream.created_at) + ) + for topic_id, ws_id, slug, title, status, owner, due_date in topic_ws_rows: + topic_workstreams.setdefault(topic_id, []).append({ + "id": ws_id, + "slug": slug, + "title": title, + "status": status, + "owner": owner, + "due_date": due_date, + }) blocking_rows = await session.execute( select(Decision) @@ -60,17 +95,18 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm blocking = list(blocking_rows.scalars().all()) blocked_rows = await session.execute( - select(Task).where(Task.status == TaskStatus.blocked).order_by(Task.created_at) + select(Task).options(noload("*")).where(Task.status == TaskStatus.blocked).order_by(Task.created_at) ) blocked = list(blocked_rows.scalars().all()) recent_rows = await session.execute( - select(ProgressEvent).order_by(ProgressEvent.created_at.desc()).limit(20) + select(ProgressEvent).options(noload("*")).order_by(ProgressEvent.created_at.desc()).limit(20) ) recent = list(recent_rows.scalars().all()) open_ws_rows = await session.execute( select(Workstream) + .options(noload("*")) .where(Workstream.status.in_(["active", "blocked"])) .order_by(Workstream.due_date.asc().nullslast(), Workstream.created_at) ) @@ -78,10 +114,12 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm # Task counts per workstream (used to enrich open_workstreams) task_per_ws: dict = {} + task_statuses_per_ws: dict = {} for ws_id, tstat, cnt in await session.execute( select(Task.workstream_id, Task.status, func.count()).group_by(Task.workstream_id, Task.status) ): task_per_ws.setdefault(ws_id, {})[tstat] = cnt + task_statuses_per_ws.setdefault(ws_id, []).extend([_value(tstat)] * cnt) # Dependency graph for open workstreams open_ws_ids = [w.id for w in open_ws] @@ -108,7 +146,7 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm extra_ids = dep_ws_ids - set(ws_lookup.keys()) if extra_ids: extra_rows = await session.execute( - select(Workstream).where(Workstream.id.in_(extra_ids)) + select(Workstream).options(noload("*")).where(Workstream.id.in_(extra_ids)) ) for w in extra_rows.scalars(): ws_lookup[w.id] = w @@ -159,7 +197,7 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm flow_obj = { "status": w.status, "workstation": w.status, - "tasks": [{"status": _value(t.status)} for t in w.tasks], + "tasks": [{"status": status} for status in task_statuses_per_ws.get(w.id, [])], "dependencies": [ {"workstation": ws_lookup[d.to_workstream_id].status} for d in dep_rows @@ -259,7 +297,13 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm return StateSummary( generated_at=datetime.now(tz=timezone.utc), totals=totals, - topics=[TopicWithWorkstreams.model_validate(t) for t in topics], + topics=[ + TopicWithWorkstreams( + **TopicRead.model_validate(t).model_dump(), + workstreams=topic_workstreams.get(t.id, []), + ) + for t in topics + ], blocking_decisions=[DecisionRead.model_validate(d) for d in blocking], blocked_tasks=[TaskRead.model_validate(t) for t in blocked], recent_progress=[ProgressEventRead.model_validate(e) for e in recent], @@ -291,7 +335,7 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm async def _build_domain_summaries(session: AsyncSession) -> list[DomainSummary]: """Compute per-domain stats for the state summary.""" domains_rows = await session.execute( - select(Domain).where(Domain.status == "active").order_by(Domain.name) + select(Domain).options(noload("*")).where(Domain.status == "active").order_by(Domain.name) ) domains = list(domains_rows.scalars().all()) @@ -357,14 +401,17 @@ async def _derive_next_steps(session: AsyncSession) -> list[NextStep]: cutoff = datetime.now(tz=timezone.utc) - timedelta(days=7) resolved_rows = await session.execute( select(Decision) + .options(noload("*")) .where(Decision.status == DecisionStatus.resolved) .where(Decision.decided_at >= cutoff) .where(Decision.workstream_id.isnot(None)) .order_by(Decision.decided_at.desc()) + .limit(20) ) for decision in resolved_rows.scalars().all(): open_tasks_rows = await session.execute( select(Task) + .options(noload("*")) .where(Task.workstream_id == decision.workstream_id) .where(Task.status.in_([TaskStatus.todo, TaskStatus.in_progress])) ) @@ -374,7 +421,7 @@ async def _derive_next_steps(session: AsyncSession) -> list[NextStep]: task = min(open_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at)) if task.id in seen_task_ids: continue - ws = await session.get(Workstream, decision.workstream_id) + ws = await session.get(Workstream, decision.workstream_id, options=[noload("*")]) domain_slug = await _get_domain_slug_for_workstream(ws, session) steps.append(NextStep( type="resolved_decision", @@ -392,57 +439,85 @@ async def _derive_next_steps(session: AsyncSession) -> list[NextStep]: seen_task_ids.add(task.id) # ── Signal 2: cleared dependencies ────────────────────────────────────── - all_dep_rows = await session.execute(select(WorkstreamDependency)) - all_deps = list(all_dep_rows.scalars().all()) + all_dep_rows = await session.execute( + select( + WorkstreamDependency.from_workstream_id, + WorkstreamDependency.to_workstream_id, + ).where(WorkstreamDependency.to_workstream_id.isnot(None)) + ) + all_deps = all_dep_rows.all() # Group from_workstream_id → set of to_workstream_ids dep_map: dict = {} - for d in all_deps: - dep_map.setdefault(d.from_workstream_id, set()).add(d.to_workstream_id) + dep_ws_ids = set() + for from_ws_id, to_ws_id in all_deps: + dep_map.setdefault(from_ws_id, set()).add(to_ws_id) + dep_ws_ids.add(from_ws_id) + dep_ws_ids.add(to_ws_id) - for from_ws_id, to_ws_ids in dep_map.items(): - # All targets must be completed - all_done = True - for to_id in to_ws_ids: - to_ws = await session.get(Workstream, to_id) - if to_ws is None or to_ws.status != "completed": - all_done = False - break - if not all_done: - continue + ws_info = {} + if dep_ws_ids: + ws_rows = await session.execute( + select( + Workstream.id, + Workstream.status, + Workstream.title, + Workstream.slug, + Workstream.topic_id, + ).where(Workstream.id.in_(dep_ws_ids)) + ) + ws_info = { + ws_id: { + "status": status, + "title": title, + "slug": slug, + "topic_id": topic_id, + } + for ws_id, status, title, slug, topic_id in ws_rows + } - from_ws = await session.get(Workstream, from_ws_id) - if from_ws is None or from_ws.status not in ("active", "blocked"): - continue + ready_from_ws_ids = [ + from_ws_id + for from_ws_id, to_ws_ids in dep_map.items() + if ws_info.get(from_ws_id, {}).get("status") in ("active", "blocked") + and all(ws_info.get(to_id, {}).get("status") == "completed" for to_id in to_ws_ids) + ] + todo_by_ws: dict = {} + if ready_from_ws_ids: todo_rows = await session.execute( select(Task) - .where(Task.workstream_id == from_ws_id) + .options(noload("*")) + .where(Task.workstream_id.in_(ready_from_ws_ids)) .where(Task.status == TaskStatus.todo) ) - todo_tasks = list(todo_rows.scalars().all()) + for task in todo_rows.scalars().all(): + todo_by_ws.setdefault(task.workstream_id, []).append(task) + + for from_ws_id in ready_from_ws_ids: + from_ws = ws_info.get(from_ws_id, {}) + todo_tasks = todo_by_ws.get(from_ws_id, []) if not todo_tasks: continue task = min(todo_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at)) if task.id in seen_task_ids: continue - domain_slug = await _get_domain_slug_for_workstream(from_ws, session) + domain_slug = await _get_domain_slug_for_topic(from_ws.get("topic_id"), session) _blocker_slugs = [] - for tid in to_ws_ids: - _ws = await session.get(Workstream, tid) - if _ws: - _blocker_slugs.append(_ws.slug) + for tid in dep_map[from_ws_id]: + if tid in ws_info: + _blocker_slugs.append(ws_info[tid]["slug"]) blocker_slugs = ", ".join(_blocker_slugs) steps.append(NextStep( type="dependency_cleared", domain=domain_slug, - workstream_id=from_ws.id, - workstream_title=from_ws.title, - workstream_slug=from_ws.slug, + workstream_id=from_ws_id, + workstream_title=from_ws["title"], + workstream_slug=from_ws["slug"], task_id=task.id, task_title=task.title, message=( - f"All dependencies of '{from_ws.title}' are completed ({blocker_slugs}) → " + f"All dependencies of '{from_ws['title']}' are completed ({blocker_slugs}) → " f"'{task.title}' is ready to start" ), )) @@ -455,10 +530,17 @@ async def _get_domain_slug_for_workstream(ws: Workstream | None, session: AsyncS """Get the domain slug for a workstream via its topic.""" if ws is None or ws.topic_id is None: return None - topic = await session.get(Topic, ws.topic_id) + return await _get_domain_slug_for_topic(ws.topic_id, session) + + +async def _get_domain_slug_for_topic(topic_id, session: AsyncSession) -> str | None: + """Get the domain slug for a topic id.""" + if topic_id is None: + return None + topic = await session.get(Topic, topic_id, options=[noload("*")]) if topic is None or topic.domain_id is None: return None - domain = await session.get(Domain, topic.domain_id) + domain = await session.get(Domain, topic.domain_id, options=[noload("*")]) return domain.slug if domain else None diff --git a/dashboard/src/capability-requests.md b/dashboard/src/capability-requests.md index 741cc6b..0786952 100644 --- a/dashboard/src/capability-requests.md +++ b/dashboard/src/capability-requests.md @@ -3,22 +3,24 @@ title: Capability Requests --- ```js -import {API} from "./components/config.js"; +import {API, apiFetch, pollDelay, sleep} from "./components/config.js"; const POLL = 30_000; ``` ```js // Live poll for capability requests const reqState = (async function*() { + let failures = 0; while (true) { let data = [], ok = false; try { - const r = await fetch(`${API}/capability-requests/`); + const r = await apiFetch("/capability-requests/"); ok = r.ok; data = ok ? await r.json() : []; } catch {} + failures = ok ? 0 : failures + 1; yield {data, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL, failures})); } })(); ``` @@ -198,14 +200,17 @@ display(Inputs.table(filtered.map(r => ({ ```js // Live poll for catalog entries const catalogState = (async function*() { + let failures = 0; while (true) { - let data = []; + let data = [], ok = false; try { - const r = await fetch(`${API}/capability-catalog/?status=all`); + const r = await apiFetch("/capability-catalog/?status=all"); + ok = r.ok; if (r.ok) data = await r.json(); } catch {} + failures = ok ? 0 : failures + 1; yield data; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL, failures})); } })(); ``` diff --git a/dashboard/src/components/config.js b/dashboard/src/components/config.js index e84e170..881baec 100644 --- a/dashboard/src/components/config.js +++ b/dashboard/src/components/config.js @@ -1,2 +1,27 @@ export const API = "http://127.0.0.1:8000"; export const POLL = 15_000; +export const POLL_HEAVY = 60_000; +export const POLL_HIDDEN = 120_000; +export const FETCH_TIMEOUT = 12_000; + +export function pollDelay({ok = true, base = POLL, failures = 0} = {}) { + const hidden = typeof document !== "undefined" && document.visibilityState === "hidden"; + const failureDelay = ok ? base : Math.min(base * 2 ** Math.min(failures, 4), 300_000); + return hidden ? Math.max(failureDelay, POLL_HIDDEN) : failureDelay; +} + +export function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +export async function apiFetch(path, options = {}) { + const url = path.startsWith("http") ? path : `${API}${path}`; + const timeout = options.timeout ?? FETCH_TIMEOUT; + const ctrl = new AbortController(); + const timer = setTimeout(() => ctrl.abort(), timeout); + try { + return await fetch(url, {...options, signal: ctrl.signal}); + } finally { + clearTimeout(timer); + } +} diff --git a/dashboard/src/contributions.md b/dashboard/src/contributions.md index 0bfa3ca..d5c43ff 100644 --- a/dashboard/src/contributions.md +++ b/dashboard/src/contributions.md @@ -3,22 +3,24 @@ title: Contributions --- ```js -import {API} from "./components/config.js"; +import {API, apiFetch, pollDelay, sleep} from "./components/config.js"; const POLL = 30_000; ``` ```js // Live poll for contributions const contribState = (async function*() { + let failures = 0; while (true) { let data = [], ok = false; try { - const r = await fetch(`${API}/contributions/`); + const r = await apiFetch("/contributions/"); ok = r.ok; data = ok ? await r.json() : []; } catch {} + failures = ok ? 0 : failures + 1; yield {data, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL, failures})); } })(); ``` diff --git a/dashboard/src/decisions.md b/dashboard/src/decisions.md index e67299d..fe9382f 100644 --- a/dashboard/src/decisions.md +++ b/dashboard/src/decisions.md @@ -3,18 +3,19 @@ title: Decisions --- ```js -import {API, POLL} from "./components/config.js"; +import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js // Fetch decisions + topics (for domain context) in parallel const decState = (async function*() { + let failures = 0; while (true) { let data = [], ok = false; try { const [rd, rt] = await Promise.all([ - fetch(`${API}/decisions/?limit=500`), - fetch(`${API}/topics/`), + apiFetch("/decisions/?limit=500"), + apiFetch("/topics/"), ]); ok = rd.ok && rt.ok; if (ok) { @@ -43,8 +44,9 @@ const decState = (async function*() { }); } } catch {} + failures = ok ? 0 : failures + 1; yield {data, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/dashboard/src/dependencies.md b/dashboard/src/dependencies.md index f047d59..bdee5c3 100644 --- a/dashboard/src/dependencies.md +++ b/dashboard/src/dependencies.md @@ -3,20 +3,21 @@ title: Dependencies --- ```js -import {API, POLL} from "./components/config.js"; +import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js // Fetch workstreams + topics + summary (summary carries dep edges on open_workstreams) const depState = (async function*() { + let failures = 0; while (true) { let wsMap = {}, edges = [], ok = false; try { const [rw, rto, rr, rs] = await Promise.all([ - fetch(`${API}/workstreams/`), - fetch(`${API}/topics/`), - fetch(`${API}/repos/`), - fetch(`${API}/state/summary`), + apiFetch("/workstreams/"), + apiFetch("/topics/"), + apiFetch("/repos/"), + apiFetch("/state/summary", {timeout: 20_000}), ]); ok = rw.ok && rto.ok && rr.ok && rs.ok; if (ok) { @@ -38,8 +39,9 @@ const depState = (async function*() { } } } catch {} + failures = ok ? 0 : failures + 1; yield {wsMap, edges, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/dashboard/src/domains.md b/dashboard/src/domains.md index a004378..5d7ecd4 100644 --- a/dashboard/src/domains.md +++ b/dashboard/src/domains.md @@ -3,25 +3,27 @@ title: Domains --- ```js -import {API, POLL} from "./components/config.js"; +import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js const domainsState = (async function*() { + let failures = 0; while (true) { let domains = [], repos = [], ok = false; try { const [rd, rr] = await Promise.all([ - fetch(`${API}/domains/?status=all`), - fetch(`${API}/repos/`), + apiFetch("/domains/?status=all"), + apiFetch("/repos/"), ]); ok = rd.ok && rr.ok; if (ok) { [domains, repos] = await Promise.all([rd.json(), rr.json()]); } } catch {} + failures = ok ? 0 : failures + 1; yield {domains, repos, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/dashboard/src/extensions.md b/dashboard/src/extensions.md index fdc7245..2002993 100644 --- a/dashboard/src/extensions.md +++ b/dashboard/src/extensions.md @@ -3,19 +3,20 @@ title: Extension Points --- ```js -import {API, POLL} from "./components/config.js"; +import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js const epState = (async function*() { + let failures = 0; while (true) { let data = [], ok = false; try { const [re, rw, rt, rr] = await Promise.all([ - fetch(`${API}/extension-points/`), - fetch(`${API}/workstreams/`), - fetch(`${API}/topics/`), - fetch(`${API}/repos/`), + apiFetch("/extension-points/"), + apiFetch("/workstreams/"), + apiFetch("/topics/"), + apiFetch("/repos/"), ]); ok = re.ok && rw.ok && rt.ok && rr.ok; if (ok) { @@ -36,8 +37,9 @@ const epState = (async function*() { }); } } catch {} + failures = ok ? 0 : failures + 1; yield {data, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/dashboard/src/goals.md b/dashboard/src/goals.md index c109537..abf44d4 100644 --- a/dashboard/src/goals.md +++ b/dashboard/src/goals.md @@ -3,20 +3,21 @@ title: Goals --- ```js -import {API} from "./components/config.js"; +import {API, apiFetch, pollDelay, sleep} from "./components/config.js"; const POLL = 20_000; ``` ```js const goalsState = (async function*() { + let failures = 0; while (true) { let domains = [], domainGoals = [], repoGoals = [], repos = [], ok = false; try { const [rd, rdg, rrg, rr] = await Promise.all([ - fetch(`${API}/domains/?status=active`), - fetch(`${API}/domain-goals/`), - fetch(`${API}/repo-goals/`), - fetch(`${API}/repos/`), + apiFetch("/domains/?status=active"), + apiFetch("/domain-goals/"), + apiFetch("/repo-goals/"), + apiFetch("/repos/"), ]); ok = rd.ok && rdg.ok && rrg.ok && rr.ok; if (ok) { @@ -25,8 +26,9 @@ const goalsState = (async function*() { ]); } } catch {} + failures = ok ? 0 : failures + 1; yield {domains, domainGoals, repoGoals, repos, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL, failures})); } })(); ``` diff --git a/dashboard/src/inbox.md b/dashboard/src/inbox.md index 2b9d511..df6f448 100644 --- a/dashboard/src/inbox.md +++ b/dashboard/src/inbox.md @@ -3,21 +3,23 @@ title: Agent Inbox --- ```js -import {API, POLL} from "./components/config.js"; +import {API, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js // Live poll: messages list const inboxState = (async function*() { + let failures = 0; while (true) { let messages = [], ok = false; try { - const resp = await fetch(`${API}/messages/?limit=100`); + const resp = await apiFetch("/messages/?limit=100"); ok = resp.ok; if (ok) messages = await resp.json(); } catch {} + failures = ok ? 0 : failures + 1; yield {messages, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, failures})); } })(); ``` diff --git a/dashboard/src/index.md b/dashboard/src/index.md index a518ea3..8f4a22b 100644 --- a/dashboard/src/index.md +++ b/dashboard/src/index.md @@ -3,23 +3,25 @@ title: Overview --- ```js -import {API, POLL} from "./components/config.js"; +import {API, POLL, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js -// Live polling — yields {data, ok, ts} every POLL ms +// Live polling — yields {data, ok, ts}; backs off when the API is slow/offline. const summaryState = (async function*() { + let failures = 0; while (true) { let data, ok = false; try { - const r = await fetch(`${API}/state/summary`); + const r = await apiFetch("/state/summary", {timeout: 20_000}); ok = r.ok; data = ok ? await r.json() : {error: `HTTP ${r.status}`}; } catch (e) { data = {error: "API unreachable"}; } + failures = ok ? 0 : failures + 1; yield {data, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` @@ -49,17 +51,20 @@ refreshDecisions(); ```js // SBOM snapshots — repo coverage and total package count const sbomSnapState = (async function*() { + let failures = 0; while (true) { - let snapshots = [], totalPkgs = 0; + let snapshots = [], totalPkgs = 0, ok = false; try { - const r = await fetch(`${API}/sbom/snapshots/`); + const r = await apiFetch("/sbom/snapshots/"); + ok = r.ok; if (r.ok) { snapshots = await r.json(); totalPkgs = snapshots.reduce((s, sn) => s + (sn.entry_count ?? 0), 0); } } catch {} + failures = ok ? 0 : failures + 1; yield {snapshots, totalPkgs}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` @@ -67,17 +72,20 @@ const sbomSnapState = (async function*() { ```js // Registered projects — milestone events tagged with registration const regsState = (async function*() { + let failures = 0; while (true) { - let rows = []; + let rows = [], ok = false; try { - const r = await fetch(`${API}/progress/?event_type=milestone&limit=500`); + const r = await apiFetch("/progress/?event_type=milestone&limit=500"); + ok = r.ok; if (r.ok) { const all = await r.json(); rows = all.filter(e => e.summary?.startsWith("Project registered with State Hub:")); } } catch {} + failures = ok ? 0 : failures + 1; yield rows; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` @@ -85,15 +93,16 @@ const regsState = (async function*() { ```js // All-workstreams + all-tasks poll — drives the multi-mode chart const wsChartState = (async function*() { + let failures = 0; while (true) { let wsAll = [], ok = false; try { const [rw, rt, rto, rr, rwi] = await Promise.all([ - fetch(`${API}/workstreams/`), - fetch(`${API}/tasks/?limit=2000`), - fetch(`${API}/topics/`), - fetch(`${API}/repos/`), - fetch(`${API}/workstreams/workplan-index`), + apiFetch("/workstreams/"), + apiFetch("/tasks/?limit=2000"), + apiFetch("/topics/"), + apiFetch("/repos/"), + apiFetch("/workstreams/workplan-index"), ]); ok = rw.ok && rt.ok && rto.ok && rr.ok; if (ok) { @@ -132,8 +141,9 @@ const wsChartState = (async function*() { }); } } catch {} + failures = ok ? 0 : failures + 1; yield {wsAll, ok}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/dashboard/src/interventions.md b/dashboard/src/interventions.md index c450ccd..299dd6c 100644 --- a/dashboard/src/interventions.md +++ b/dashboard/src/interventions.md @@ -3,20 +3,21 @@ title: Interventions --- ```js -import {API, POLL} from "./components/config.js"; +import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js // Live poll: all tasks (filtered client-side) + workstreams + topics const interventionState = (async function*() { + let failures = 0; while (true) { let tasks = [], wsMap = {}, ok = false; try { const [rt, rw, rto, rr] = await Promise.all([ - fetch(`${API}/tasks/?limit=500`), - fetch(`${API}/workstreams/`), - fetch(`${API}/topics/`), - fetch(`${API}/repos/`), + apiFetch("/tasks/?limit=500"), + apiFetch("/workstreams/"), + apiFetch("/topics/"), + apiFetch("/repos/"), ]); ok = rt.ok && rw.ok && rto.ok && rr.ok; if (ok) { @@ -36,8 +37,9 @@ const interventionState = (async function*() { })); } } catch {} + failures = ok ? 0 : failures + 1; yield {tasks, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/dashboard/src/progress.md b/dashboard/src/progress.md index 9e93eae..907ffae 100644 --- a/dashboard/src/progress.md +++ b/dashboard/src/progress.md @@ -3,24 +3,26 @@ title: Progress --- ```js -import {API, POLL} from "./components/config.js"; +import {POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js const progState = (async function*() { + let failures = 0; while (true) { let data = [], tokenEvents = [], ok = false; try { const [r1, r2] = await Promise.all([ - fetch(`${API}/progress/?limit=500`), - fetch(`${API}/token-events/?limit=1000`), + apiFetch("/progress/?limit=500"), + apiFetch("/token-events/?limit=1000"), ]); ok = r1.ok; data = ok ? await r1.json() : []; tokenEvents = r2.ok ? await r2.json() : []; } catch {} + failures = ok ? 0 : failures + 1; yield {data, tokenEvents, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/dashboard/src/tasks.md b/dashboard/src/tasks.md index ce9e4b0..016421d 100644 --- a/dashboard/src/tasks.md +++ b/dashboard/src/tasks.md @@ -3,19 +3,20 @@ title: Tasks --- ```js -import {API, POLL} from "./components/config.js"; +import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js const taskState = (async function*() { + let failures = 0; while (true) { let data = [], ok = false; try { const [rt, rw, rto, rr] = await Promise.all([ - fetch(`${API}/tasks/?limit=500`), - fetch(`${API}/workstreams/`), - fetch(`${API}/topics/`), - fetch(`${API}/repos/`), + apiFetch("/tasks/?limit=500"), + apiFetch("/workstreams/"), + apiFetch("/topics/"), + apiFetch("/repos/"), ]); ok = rt.ok && rw.ok && rto.ok && rr.ok; if (ok) { @@ -33,8 +34,9 @@ const taskState = (async function*() { })); } } catch {} + failures = ok ? 0 : failures + 1; yield {data, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/dashboard/src/techdept.md b/dashboard/src/techdept.md index f2545ec..cb09bfb 100644 --- a/dashboard/src/techdept.md +++ b/dashboard/src/techdept.md @@ -3,19 +3,20 @@ title: Technical Debt --- ```js -import {API, POLL} from "./components/config.js"; +import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js const tdState = (async function*() { + let failures = 0; while (true) { let data = [], ok = false; try { const [rt, rw, rto, rr] = await Promise.all([ - fetch(`${API}/technical-debt/`), - fetch(`${API}/workstreams/`), - fetch(`${API}/topics/`), - fetch(`${API}/repos/`), + apiFetch("/technical-debt/"), + apiFetch("/workstreams/"), + apiFetch("/topics/"), + apiFetch("/repos/"), ]); ok = rt.ok && rw.ok && rto.ok && rr.ok; if (ok) { @@ -36,8 +37,9 @@ const tdState = (async function*() { }); } } catch {} + failures = ok ? 0 : failures + 1; yield {data, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/dashboard/src/todo.md b/dashboard/src/todo.md index ed6dee1..960d8a3 100644 --- a/dashboard/src/todo.md +++ b/dashboard/src/todo.md @@ -3,23 +3,24 @@ title: Todo --- ```js -import {API, POLL} from "./components/config.js"; +import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; const THIS_REPO = "the-custodian"; ``` ```js // Live poll: tasks + workstreams + topics + contributions const todoState = (async function*() { + let failures = 0; while (true) { let tasks = [], contribs = [], improvements = [], wsMap = {}, ok = false; try { const [rt, rw, rto, rr, rc, ri] = await Promise.all([ - fetch(`${API}/tasks/?limit=500`), - fetch(`${API}/workstreams/`), - fetch(`${API}/topics/`), - fetch(`${API}/repos/`), - fetch(`${API}/contributions/`), - fetch(`${API}/technical-debt/?debt_type=dashboard-improvement`), + apiFetch("/tasks/?limit=500"), + apiFetch("/workstreams/"), + apiFetch("/topics/"), + apiFetch("/repos/"), + apiFetch("/contributions/"), + apiFetch("/technical-debt/?debt_type=dashboard-improvement"), ]); ok = rt.ok && rw.ok && rto.ok && rr.ok && rc.ok; if (ok) { @@ -42,8 +43,9 @@ const todoState = (async function*() { improvements = ri.ok ? (await ri.json()).filter(t => t.debt_type === "dashboard-improvement" && !CLOSED.has(t.status)) : []; } } catch {} + failures = ok ? 0 : failures + 1; yield {tasks, contribs, improvements, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/dashboard/src/token-cost.md b/dashboard/src/token-cost.md index a24d20e..8516717 100644 --- a/dashboard/src/token-cost.md +++ b/dashboard/src/token-cost.md @@ -3,7 +3,7 @@ title: Token Cost --- ```js -import {API} from "./components/config.js"; +import {apiFetch, pollDelay, sleep} from "./components/config.js"; import {refCell} from "./components/ref-cell.js"; const POLL = 60_000; ``` @@ -11,14 +11,15 @@ const POLL = 60_000; ```js // Fetch token events, by-repo summary, workstreams, and tasks in parallel const tokenState = (async function*() { + let failures = 0; while (true) { let byRepo = [], events = [], wsMap = {}, taskMap = {}, ok = false; try { const [r1, r2, r3, r4] = await Promise.all([ - fetch(`${API}/token-events/by-repo/`), - fetch(`${API}/token-events/?limit=1000`), - fetch(`${API}/workstreams/`), - fetch(`${API}/tasks/`), + apiFetch("/token-events/by-repo/"), + apiFetch("/token-events/?limit=1000"), + apiFetch("/workstreams/"), + apiFetch("/tasks/"), ]); ok = r1.ok && r2.ok; if (ok) { @@ -34,8 +35,9 @@ const tokenState = (async function*() { for (const t of taskList) taskMap[t.id] = t; } } catch {} + failures = ok ? 0 : failures + 1; yield {byRepo, events, wsMap, taskMap, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL, failures})); } })(); ``` diff --git a/dashboard/src/ui-feedback.md b/dashboard/src/ui-feedback.md index dff6702..e64a763 100644 --- a/dashboard/src/ui-feedback.md +++ b/dashboard/src/ui-feedback.md @@ -3,7 +3,7 @@ title: UI Feedback --- ```js -import {API, POLL} from "./components/config.js"; +import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js @@ -47,10 +47,11 @@ function nextStep(current) { ```js const feedbackState = (async function*() { + let failures = 0; while (true) { let data = [], ok = false; try { - const r = await fetch(`${API}/technical-debt/?debt_type=dashboard-improvement`); + const r = await apiFetch("/technical-debt/?debt_type=dashboard-improvement"); ok = r.ok; if (ok) { const items = await r.json(); @@ -62,8 +63,9 @@ const feedbackState = (async function*() { }); } } catch {} + failures = ok ? 0 : failures + 1; yield {data, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/dashboard/src/workstreams.md b/dashboard/src/workstreams.md index 136fe87..21ced66 100644 --- a/dashboard/src/workstreams.md +++ b/dashboard/src/workstreams.md @@ -3,20 +3,21 @@ title: Workstreams --- ```js -import {API, POLL} from "./components/config.js"; +import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js"; ``` ```js // Fetch workstreams + topics + summary (for dep graph) in parallel const wsState = (async function*() { + let failures = 0; while (true) { let data = [], openWs = [], ok = false; try { const [rw, rt, rr, rs] = await Promise.all([ - fetch(`${API}/workstreams/`), - fetch(`${API}/topics/`), - fetch(`${API}/repos/`), - fetch(`${API}/state/summary`), + apiFetch("/workstreams/"), + apiFetch("/topics/"), + apiFetch("/repos/"), + apiFetch("/state/summary", {timeout: 20_000}), ]); ok = rw.ok && rt.ok && rr.ok && rs.ok; if (ok) { @@ -32,8 +33,9 @@ const wsState = (async function*() { openWs = summary.open_workstreams ?? []; } } catch {} + failures = ok ? 0 : failures + 1; yield {data, openWs, ok, ts: new Date()}; - await new Promise(res => setTimeout(res, POLL)); + await sleep(pollDelay({ok, base: POLL_HEAVY, failures})); } })(); ``` diff --git a/infra/README.md b/infra/README.md index eac4e42..1160ea8 100644 --- a/infra/README.md +++ b/infra/README.md @@ -17,9 +17,19 @@ The compose file is `infra/docker-compose.yml`. Copy `.env.example` to `.env` an ## Periodic Repo Sync — systemd user timer -The custodian sync timer runs `fix-consistency-all` every 15 minutes, keeping -workplan file state in sync with the state-hub DB automatically (belt-and-suspenders -alongside the per-repo git post-commit hooks). +The custodian sync timer runs `consistency_check.py --remote --all` every 15 +minutes, keeping workplan file state in sync with the state-hub DB automatically +(belt-and-suspenders alongside the per-repo git post-commit hooks). + +The all-repo remote sweep has two built-in load guards: + +- A nonblocking process lock at `/tmp/custodian-consistency-remote-all.lock`; + if a prior sweep is still active, the next timer run exits cleanly. +- A wall-clock budget, defaulting to 300 seconds. Remaining repos are skipped + once the budget is exhausted. Override with `--max-seconds N` or set + `CONSISTENCY_REMOTE_ALL_MAX_SECONDS`. +- Warn-only sweeps exit 0 in `--remote --all` mode so the systemd unit only + goes failed for hard consistency failures. ### Installed unit files @@ -64,7 +74,7 @@ If systemd is not available, fall back to crontab: ```bash # Crontab fallback (run crontab -e and add): -*/15 * * * * curl -sf http://127.0.0.1:8000/state/health && cd ~/the-custodian/state-hub && .venv/bin/python scripts/consistency_check.py --all --fix >> /tmp/custodian-sync.log 2>&1 +*/15 * * * * curl -sf http://127.0.0.1:8000/state/health && cd ~/the-custodian/state-hub && .venv/bin/python scripts/consistency_check.py --remote --all >> /tmp/custodian-sync.log 2>&1 ``` --- diff --git a/scripts/consistency_check.py b/scripts/consistency_check.py index 61a9d20..a3440f9 100644 --- a/scripts/consistency_check.py +++ b/scripts/consistency_check.py @@ -38,11 +38,14 @@ Exit codes: from __future__ import annotations import argparse +import os import json import re import socket import subprocess import sys +import time +from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime from pathlib import Path @@ -72,6 +75,7 @@ VALID_WP_STATUSES = {"active", "completed", "archived"} VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"} VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"} VALID_DEP_RELATIONSHIPS = {"blocks", "starts_after", "informs", "soft_dependency"} +DEFAULT_REMOTE_ALL_MAX_SECONDS = int(os.environ.get("CONSISTENCY_REMOTE_ALL_MAX_SECONDS", "300")) # Workplan files use task-style vocabulary ("done"); the DB workstream API uses # "completed". This map translates file values to DB values before comparison @@ -161,6 +165,37 @@ class ConsistencyReport: return [i for i in self.issues if i.severity == "INFO"] +@contextmanager +def run_lock(name: str): + """Hold a nonblocking process lock for long-running consistency modes.""" + try: + import fcntl + except ImportError: + yield True + return + + lock_path = Path(os.environ.get("CONSISTENCY_LOCK_DIR", "/tmp")) / f"custodian-{name}.lock" + lock_path.parent.mkdir(parents=True, exist_ok=True) + handle = lock_path.open("w", encoding="utf-8") + try: + try: + fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError: + yield False + return + handle.seek(0) + handle.truncate() + handle.write(f"{os.getpid()} {datetime.utcnow().isoformat()}Z\n") + handle.flush() + yield True + finally: + try: + fcntl.flock(handle, fcntl.LOCK_UN) + except OSError: + pass + handle.close() + + # --------------------------------------------------------------------------- # YAML / frontmatter parsing # --------------------------------------------------------------------------- @@ -1591,6 +1626,7 @@ def _report_needs_action( def fix_all_remote( api_base: str, no_writeback: bool = False, + max_seconds: int = DEFAULT_REMOTE_ALL_MAX_SECONDS, ) -> list[ConsistencyReport]: """Pull-then-fix all registered repos that need attention. @@ -1608,12 +1644,19 @@ def fix_all_remote( print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr) return [] + started = time.monotonic() reports: list[ConsistencyReport] = [] skipped_clean: list[str] = [] skipped_missing: list[str] = [] + skipped_budget: list[str] = [] for repo in repos: slug = repo["slug"] + if max_seconds > 0 and time.monotonic() - started > max_seconds: + skipped_budget.append(slug) + skipped_budget.extend(r.get("slug", "?") for r in repos[repos.index(repo) + 1:]) + break + # Resolve path using the same priority as check_repo path = resolve_repo_path(repo) if not path or not Path(path).is_dir(): @@ -1646,7 +1689,12 @@ def fix_all_remote( print(f" CLEAN (skipped): {', '.join(skipped_clean)}") if skipped_missing: print(f" NOT ON THIS HOST (skipped): {', '.join(skipped_missing)}") - if skipped_clean or skipped_missing: + if skipped_budget: + print( + f" BUDGET EXHAUSTED after {max_seconds}s (skipped): " + f"{', '.join(skipped_budget)}" + ) + if skipped_clean or skipped_missing or skipped_budget: print() return reports @@ -1803,6 +1851,9 @@ def main() -> None: help="Pull each repo before fixing; when used with --all, skips repos " "that are already clean (no actionable issues, not behind remote). " "Implies --fix.") + parser.add_argument("--max-seconds", type=int, default=DEFAULT_REMOTE_ALL_MAX_SECONDS, + help="Wall-clock budget for --remote --all before remaining repos are skipped " + f"(default: {DEFAULT_REMOTE_ALL_MAX_SECONDS}; 0 disables)") parser.add_argument("--no-writeback", action="store_true", dest="no_writeback", help="Disable DB→file status writeback (C-15) while keeping other fixes") parser.add_argument("--archive-closed", action="store_true", @@ -1849,7 +1900,18 @@ def main() -> None: reports[0].fixes_applied.extend(f"archive: {m}" for m in moved) # --remote --all: smart pull+fix across all repos elif args.remote and args.all: - reports = fix_all_remote(args.api_base, no_writeback=no_wb) + with run_lock("consistency-remote-all") as acquired: + if not acquired: + print( + "SKIP: another fix-consistency-remote --all run is already active", + file=sys.stderr, + ) + sys.exit(0) + reports = fix_all_remote( + args.api_base, + no_writeback=no_wb, + max_seconds=args.max_seconds, + ) if not reports: sys.exit(0) else: @@ -1915,6 +1977,8 @@ def main() -> None: any_fail = any(r.failures for r in reports) any_warn = any(r.warnings for r in reports) + if args.remote and args.all and not any_fail: + sys.exit(0) sys.exit(1 if any_fail else 2 if any_warn else 0) diff --git a/scripts/push-seal.md b/scripts/push-seal.md index a296703..fcdf969 100644 --- a/scripts/push-seal.md +++ b/scripts/push-seal.md @@ -104,6 +104,10 @@ ExecStart=… consistency_check.py --remote --all 2. Skips repos that are already clean (no issues, not behind, not ahead) 3. For repos needing action: `git pull --ff-only` first, then `fix_repo()` (which ends with T04 push) +It also holds `/tmp/custodian-consistency-remote-all.lock` for the duration of +the sweep and defaults to a 300-second wall-clock budget. These guards keep a +slow or stalled sweep from overlapping with the next 15-minute timer activation. + Previously `--all --fix` was used, which skipped the pull step and the clean-repo skip logic. ---