Load limiting safeguards

This commit is contained in:
2026-05-06 04:04:53 +02:00
parent 47f6971c56
commit 2484ed2815
22 changed files with 374 additions and 144 deletions

View File

@@ -4,6 +4,7 @@ from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import noload, selectinload
from api.database import get_session, engine
from api.flow_defs import assertion_result_to_dict, load_flow
@@ -33,7 +34,7 @@ from api.schemas.state import (
WorkstreamTotals,
)
from api.schemas.task import TaskRead
from api.schemas.topic import TopicWithWorkstreams
from api.schemas.topic import TopicRead, TopicWithWorkstreams
from api.schemas.workstream import WorkstreamRead, WorkstreamWithTaskCounts, WorkstreamWithDeps
from api.schemas.workstream_dependency import WorkstreamDepStub
from task_flow_engine import FlowEngine
@@ -47,9 +48,43 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
# AsyncSession does not support concurrent operations (no gather on same session).
topics_rows = await session.execute(
select(Topic).where(Topic.status != TopicStatus.archived).order_by(Topic.created_at)
select(Topic)
.options(
selectinload(Topic.domain),
noload(Topic.workstreams),
noload(Topic.decisions),
noload(Topic.progress_events),
)
.where(Topic.status != TopicStatus.archived)
.order_by(Topic.created_at)
)
topics = list(topics_rows.scalars().all())
topic_ids = [t.id for t in topics]
topic_workstreams: dict = {t.id: [] for t in topics}
if topic_ids:
topic_ws_rows = await session.execute(
select(
Workstream.topic_id,
Workstream.id,
Workstream.slug,
Workstream.title,
Workstream.status,
Workstream.owner,
Workstream.due_date,
)
.where(Workstream.topic_id.in_(topic_ids))
.order_by(Workstream.created_at)
)
for topic_id, ws_id, slug, title, status, owner, due_date in topic_ws_rows:
topic_workstreams.setdefault(topic_id, []).append({
"id": ws_id,
"slug": slug,
"title": title,
"status": status,
"owner": owner,
"due_date": due_date,
})
blocking_rows = await session.execute(
select(Decision)
@@ -60,17 +95,18 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
blocking = list(blocking_rows.scalars().all())
blocked_rows = await session.execute(
select(Task).where(Task.status == TaskStatus.blocked).order_by(Task.created_at)
select(Task).options(noload("*")).where(Task.status == TaskStatus.blocked).order_by(Task.created_at)
)
blocked = list(blocked_rows.scalars().all())
recent_rows = await session.execute(
select(ProgressEvent).order_by(ProgressEvent.created_at.desc()).limit(20)
select(ProgressEvent).options(noload("*")).order_by(ProgressEvent.created_at.desc()).limit(20)
)
recent = list(recent_rows.scalars().all())
open_ws_rows = await session.execute(
select(Workstream)
.options(noload("*"))
.where(Workstream.status.in_(["active", "blocked"]))
.order_by(Workstream.due_date.asc().nullslast(), Workstream.created_at)
)
@@ -78,10 +114,12 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
# Task counts per workstream (used to enrich open_workstreams)
task_per_ws: dict = {}
task_statuses_per_ws: dict = {}
for ws_id, tstat, cnt in await session.execute(
select(Task.workstream_id, Task.status, func.count()).group_by(Task.workstream_id, Task.status)
):
task_per_ws.setdefault(ws_id, {})[tstat] = cnt
task_statuses_per_ws.setdefault(ws_id, []).extend([_value(tstat)] * cnt)
# Dependency graph for open workstreams
open_ws_ids = [w.id for w in open_ws]
@@ -108,7 +146,7 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
extra_ids = dep_ws_ids - set(ws_lookup.keys())
if extra_ids:
extra_rows = await session.execute(
select(Workstream).where(Workstream.id.in_(extra_ids))
select(Workstream).options(noload("*")).where(Workstream.id.in_(extra_ids))
)
for w in extra_rows.scalars():
ws_lookup[w.id] = w
@@ -159,7 +197,7 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
flow_obj = {
"status": w.status,
"workstation": w.status,
"tasks": [{"status": _value(t.status)} for t in w.tasks],
"tasks": [{"status": status} for status in task_statuses_per_ws.get(w.id, [])],
"dependencies": [
{"workstation": ws_lookup[d.to_workstream_id].status}
for d in dep_rows
@@ -259,7 +297,13 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
return StateSummary(
generated_at=datetime.now(tz=timezone.utc),
totals=totals,
topics=[TopicWithWorkstreams.model_validate(t) for t in topics],
topics=[
TopicWithWorkstreams(
**TopicRead.model_validate(t).model_dump(),
workstreams=topic_workstreams.get(t.id, []),
)
for t in topics
],
blocking_decisions=[DecisionRead.model_validate(d) for d in blocking],
blocked_tasks=[TaskRead.model_validate(t) for t in blocked],
recent_progress=[ProgressEventRead.model_validate(e) for e in recent],
@@ -291,7 +335,7 @@ async def get_summary(session: AsyncSession = Depends(get_session)) -> StateSumm
async def _build_domain_summaries(session: AsyncSession) -> list[DomainSummary]:
"""Compute per-domain stats for the state summary."""
domains_rows = await session.execute(
select(Domain).where(Domain.status == "active").order_by(Domain.name)
select(Domain).options(noload("*")).where(Domain.status == "active").order_by(Domain.name)
)
domains = list(domains_rows.scalars().all())
@@ -357,14 +401,17 @@ async def _derive_next_steps(session: AsyncSession) -> list[NextStep]:
cutoff = datetime.now(tz=timezone.utc) - timedelta(days=7)
resolved_rows = await session.execute(
select(Decision)
.options(noload("*"))
.where(Decision.status == DecisionStatus.resolved)
.where(Decision.decided_at >= cutoff)
.where(Decision.workstream_id.isnot(None))
.order_by(Decision.decided_at.desc())
.limit(20)
)
for decision in resolved_rows.scalars().all():
open_tasks_rows = await session.execute(
select(Task)
.options(noload("*"))
.where(Task.workstream_id == decision.workstream_id)
.where(Task.status.in_([TaskStatus.todo, TaskStatus.in_progress]))
)
@@ -374,7 +421,7 @@ async def _derive_next_steps(session: AsyncSession) -> list[NextStep]:
task = min(open_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at))
if task.id in seen_task_ids:
continue
ws = await session.get(Workstream, decision.workstream_id)
ws = await session.get(Workstream, decision.workstream_id, options=[noload("*")])
domain_slug = await _get_domain_slug_for_workstream(ws, session)
steps.append(NextStep(
type="resolved_decision",
@@ -392,57 +439,85 @@ async def _derive_next_steps(session: AsyncSession) -> list[NextStep]:
seen_task_ids.add(task.id)
# ── Signal 2: cleared dependencies ──────────────────────────────────────
all_dep_rows = await session.execute(select(WorkstreamDependency))
all_deps = list(all_dep_rows.scalars().all())
all_dep_rows = await session.execute(
select(
WorkstreamDependency.from_workstream_id,
WorkstreamDependency.to_workstream_id,
).where(WorkstreamDependency.to_workstream_id.isnot(None))
)
all_deps = all_dep_rows.all()
# Group from_workstream_id → set of to_workstream_ids
dep_map: dict = {}
for d in all_deps:
dep_map.setdefault(d.from_workstream_id, set()).add(d.to_workstream_id)
dep_ws_ids = set()
for from_ws_id, to_ws_id in all_deps:
dep_map.setdefault(from_ws_id, set()).add(to_ws_id)
dep_ws_ids.add(from_ws_id)
dep_ws_ids.add(to_ws_id)
for from_ws_id, to_ws_ids in dep_map.items():
# All targets must be completed
all_done = True
for to_id in to_ws_ids:
to_ws = await session.get(Workstream, to_id)
if to_ws is None or to_ws.status != "completed":
all_done = False
break
if not all_done:
continue
ws_info = {}
if dep_ws_ids:
ws_rows = await session.execute(
select(
Workstream.id,
Workstream.status,
Workstream.title,
Workstream.slug,
Workstream.topic_id,
).where(Workstream.id.in_(dep_ws_ids))
)
ws_info = {
ws_id: {
"status": status,
"title": title,
"slug": slug,
"topic_id": topic_id,
}
for ws_id, status, title, slug, topic_id in ws_rows
}
from_ws = await session.get(Workstream, from_ws_id)
if from_ws is None or from_ws.status not in ("active", "blocked"):
continue
ready_from_ws_ids = [
from_ws_id
for from_ws_id, to_ws_ids in dep_map.items()
if ws_info.get(from_ws_id, {}).get("status") in ("active", "blocked")
and all(ws_info.get(to_id, {}).get("status") == "completed" for to_id in to_ws_ids)
]
todo_by_ws: dict = {}
if ready_from_ws_ids:
todo_rows = await session.execute(
select(Task)
.where(Task.workstream_id == from_ws_id)
.options(noload("*"))
.where(Task.workstream_id.in_(ready_from_ws_ids))
.where(Task.status == TaskStatus.todo)
)
todo_tasks = list(todo_rows.scalars().all())
for task in todo_rows.scalars().all():
todo_by_ws.setdefault(task.workstream_id, []).append(task)
for from_ws_id in ready_from_ws_ids:
from_ws = ws_info.get(from_ws_id, {})
todo_tasks = todo_by_ws.get(from_ws_id, [])
if not todo_tasks:
continue
task = min(todo_tasks, key=lambda t: (_PRIORITY_RANK.get(t.priority, 99), t.created_at))
if task.id in seen_task_ids:
continue
domain_slug = await _get_domain_slug_for_workstream(from_ws, session)
domain_slug = await _get_domain_slug_for_topic(from_ws.get("topic_id"), session)
_blocker_slugs = []
for tid in to_ws_ids:
_ws = await session.get(Workstream, tid)
if _ws:
_blocker_slugs.append(_ws.slug)
for tid in dep_map[from_ws_id]:
if tid in ws_info:
_blocker_slugs.append(ws_info[tid]["slug"])
blocker_slugs = ", ".join(_blocker_slugs)
steps.append(NextStep(
type="dependency_cleared",
domain=domain_slug,
workstream_id=from_ws.id,
workstream_title=from_ws.title,
workstream_slug=from_ws.slug,
workstream_id=from_ws_id,
workstream_title=from_ws["title"],
workstream_slug=from_ws["slug"],
task_id=task.id,
task_title=task.title,
message=(
f"All dependencies of '{from_ws.title}' are completed ({blocker_slugs}) → "
f"All dependencies of '{from_ws['title']}' are completed ({blocker_slugs}) → "
f"'{task.title}' is ready to start"
),
))
@@ -455,10 +530,17 @@ async def _get_domain_slug_for_workstream(ws: Workstream | None, session: AsyncS
"""Get the domain slug for a workstream via its topic."""
if ws is None or ws.topic_id is None:
return None
topic = await session.get(Topic, ws.topic_id)
return await _get_domain_slug_for_topic(ws.topic_id, session)
async def _get_domain_slug_for_topic(topic_id, session: AsyncSession) -> str | None:
"""Get the domain slug for a topic id."""
if topic_id is None:
return None
topic = await session.get(Topic, topic_id, options=[noload("*")])
if topic is None or topic.domain_id is None:
return None
domain = await session.get(Domain, topic.domain_id)
domain = await session.get(Domain, topic.domain_id, options=[noload("*")])
return domain.slug if domain else None

View File

@@ -3,22 +3,24 @@ title: Capability Requests
---
```js
import {API} from "./components/config.js";
import {API, apiFetch, pollDelay, sleep} from "./components/config.js";
const POLL = 30_000;
```
```js
// Live poll for capability requests
const reqState = (async function*() {
let failures = 0;
while (true) {
let data = [], ok = false;
try {
const r = await fetch(`${API}/capability-requests/`);
const r = await apiFetch("/capability-requests/");
ok = r.ok;
data = ok ? await r.json() : [];
} catch {}
failures = ok ? 0 : failures + 1;
yield {data, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL, failures}));
}
})();
```
@@ -198,14 +200,17 @@ display(Inputs.table(filtered.map(r => ({
```js
// Live poll for catalog entries
const catalogState = (async function*() {
let failures = 0;
while (true) {
let data = [];
let data = [], ok = false;
try {
const r = await fetch(`${API}/capability-catalog/?status=all`);
const r = await apiFetch("/capability-catalog/?status=all");
ok = r.ok;
if (r.ok) data = await r.json();
} catch {}
failures = ok ? 0 : failures + 1;
yield data;
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL, failures}));
}
})();
```

View File

@@ -1,2 +1,27 @@
export const API = "http://127.0.0.1:8000";
export const POLL = 15_000;
export const POLL_HEAVY = 60_000;
export const POLL_HIDDEN = 120_000;
export const FETCH_TIMEOUT = 12_000;
export function pollDelay({ok = true, base = POLL, failures = 0} = {}) {
const hidden = typeof document !== "undefined" && document.visibilityState === "hidden";
const failureDelay = ok ? base : Math.min(base * 2 ** Math.min(failures, 4), 300_000);
return hidden ? Math.max(failureDelay, POLL_HIDDEN) : failureDelay;
}
export function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
export async function apiFetch(path, options = {}) {
const url = path.startsWith("http") ? path : `${API}${path}`;
const timeout = options.timeout ?? FETCH_TIMEOUT;
const ctrl = new AbortController();
const timer = setTimeout(() => ctrl.abort(), timeout);
try {
return await fetch(url, {...options, signal: ctrl.signal});
} finally {
clearTimeout(timer);
}
}

View File

@@ -3,22 +3,24 @@ title: Contributions
---
```js
import {API} from "./components/config.js";
import {API, apiFetch, pollDelay, sleep} from "./components/config.js";
const POLL = 30_000;
```
```js
// Live poll for contributions
const contribState = (async function*() {
let failures = 0;
while (true) {
let data = [], ok = false;
try {
const r = await fetch(`${API}/contributions/`);
const r = await apiFetch("/contributions/");
ok = r.ok;
data = ok ? await r.json() : [];
} catch {}
failures = ok ? 0 : failures + 1;
yield {data, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL, failures}));
}
})();
```

View File

@@ -3,18 +3,19 @@ title: Decisions
---
```js
import {API, POLL} from "./components/config.js";
import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
// Fetch decisions + topics (for domain context) in parallel
const decState = (async function*() {
let failures = 0;
while (true) {
let data = [], ok = false;
try {
const [rd, rt] = await Promise.all([
fetch(`${API}/decisions/?limit=500`),
fetch(`${API}/topics/`),
apiFetch("/decisions/?limit=500"),
apiFetch("/topics/"),
]);
ok = rd.ok && rt.ok;
if (ok) {
@@ -43,8 +44,9 @@ const decState = (async function*() {
});
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {data, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -3,20 +3,21 @@ title: Dependencies
---
```js
import {API, POLL} from "./components/config.js";
import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
// Fetch workstreams + topics + summary (summary carries dep edges on open_workstreams)
const depState = (async function*() {
let failures = 0;
while (true) {
let wsMap = {}, edges = [], ok = false;
try {
const [rw, rto, rr, rs] = await Promise.all([
fetch(`${API}/workstreams/`),
fetch(`${API}/topics/`),
fetch(`${API}/repos/`),
fetch(`${API}/state/summary`),
apiFetch("/workstreams/"),
apiFetch("/topics/"),
apiFetch("/repos/"),
apiFetch("/state/summary", {timeout: 20_000}),
]);
ok = rw.ok && rto.ok && rr.ok && rs.ok;
if (ok) {
@@ -38,8 +39,9 @@ const depState = (async function*() {
}
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {wsMap, edges, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -3,25 +3,27 @@ title: Domains
---
```js
import {API, POLL} from "./components/config.js";
import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
const domainsState = (async function*() {
let failures = 0;
while (true) {
let domains = [], repos = [], ok = false;
try {
const [rd, rr] = await Promise.all([
fetch(`${API}/domains/?status=all`),
fetch(`${API}/repos/`),
apiFetch("/domains/?status=all"),
apiFetch("/repos/"),
]);
ok = rd.ok && rr.ok;
if (ok) {
[domains, repos] = await Promise.all([rd.json(), rr.json()]);
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {domains, repos, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -3,19 +3,20 @@ title: Extension Points
---
```js
import {API, POLL} from "./components/config.js";
import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
const epState = (async function*() {
let failures = 0;
while (true) {
let data = [], ok = false;
try {
const [re, rw, rt, rr] = await Promise.all([
fetch(`${API}/extension-points/`),
fetch(`${API}/workstreams/`),
fetch(`${API}/topics/`),
fetch(`${API}/repos/`),
apiFetch("/extension-points/"),
apiFetch("/workstreams/"),
apiFetch("/topics/"),
apiFetch("/repos/"),
]);
ok = re.ok && rw.ok && rt.ok && rr.ok;
if (ok) {
@@ -36,8 +37,9 @@ const epState = (async function*() {
});
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {data, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -3,20 +3,21 @@ title: Goals
---
```js
import {API} from "./components/config.js";
import {API, apiFetch, pollDelay, sleep} from "./components/config.js";
const POLL = 20_000;
```
```js
const goalsState = (async function*() {
let failures = 0;
while (true) {
let domains = [], domainGoals = [], repoGoals = [], repos = [], ok = false;
try {
const [rd, rdg, rrg, rr] = await Promise.all([
fetch(`${API}/domains/?status=active`),
fetch(`${API}/domain-goals/`),
fetch(`${API}/repo-goals/`),
fetch(`${API}/repos/`),
apiFetch("/domains/?status=active"),
apiFetch("/domain-goals/"),
apiFetch("/repo-goals/"),
apiFetch("/repos/"),
]);
ok = rd.ok && rdg.ok && rrg.ok && rr.ok;
if (ok) {
@@ -25,8 +26,9 @@ const goalsState = (async function*() {
]);
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {domains, domainGoals, repoGoals, repos, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL, failures}));
}
})();
```

View File

@@ -3,21 +3,23 @@ title: Agent Inbox
---
```js
import {API, POLL} from "./components/config.js";
import {API, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
// Live poll: messages list
const inboxState = (async function*() {
let failures = 0;
while (true) {
let messages = [], ok = false;
try {
const resp = await fetch(`${API}/messages/?limit=100`);
const resp = await apiFetch("/messages/?limit=100");
ok = resp.ok;
if (ok) messages = await resp.json();
} catch {}
failures = ok ? 0 : failures + 1;
yield {messages, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, failures}));
}
})();
```

View File

@@ -3,23 +3,25 @@ title: Overview
---
```js
import {API, POLL} from "./components/config.js";
import {API, POLL, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
// Live polling — yields {data, ok, ts} every POLL ms
// Live polling — yields {data, ok, ts}; backs off when the API is slow/offline.
const summaryState = (async function*() {
let failures = 0;
while (true) {
let data, ok = false;
try {
const r = await fetch(`${API}/state/summary`);
const r = await apiFetch("/state/summary", {timeout: 20_000});
ok = r.ok;
data = ok ? await r.json() : {error: `HTTP ${r.status}`};
} catch (e) {
data = {error: "API unreachable"};
}
failures = ok ? 0 : failures + 1;
yield {data, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```
@@ -49,17 +51,20 @@ refreshDecisions();
```js
// SBOM snapshots — repo coverage and total package count
const sbomSnapState = (async function*() {
let failures = 0;
while (true) {
let snapshots = [], totalPkgs = 0;
let snapshots = [], totalPkgs = 0, ok = false;
try {
const r = await fetch(`${API}/sbom/snapshots/`);
const r = await apiFetch("/sbom/snapshots/");
ok = r.ok;
if (r.ok) {
snapshots = await r.json();
totalPkgs = snapshots.reduce((s, sn) => s + (sn.entry_count ?? 0), 0);
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {snapshots, totalPkgs};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```
@@ -67,17 +72,20 @@ const sbomSnapState = (async function*() {
```js
// Registered projects — milestone events tagged with registration
const regsState = (async function*() {
let failures = 0;
while (true) {
let rows = [];
let rows = [], ok = false;
try {
const r = await fetch(`${API}/progress/?event_type=milestone&limit=500`);
const r = await apiFetch("/progress/?event_type=milestone&limit=500");
ok = r.ok;
if (r.ok) {
const all = await r.json();
rows = all.filter(e => e.summary?.startsWith("Project registered with State Hub:"));
}
} catch {}
failures = ok ? 0 : failures + 1;
yield rows;
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```
@@ -85,15 +93,16 @@ const regsState = (async function*() {
```js
// All-workstreams + all-tasks poll — drives the multi-mode chart
const wsChartState = (async function*() {
let failures = 0;
while (true) {
let wsAll = [], ok = false;
try {
const [rw, rt, rto, rr, rwi] = await Promise.all([
fetch(`${API}/workstreams/`),
fetch(`${API}/tasks/?limit=2000`),
fetch(`${API}/topics/`),
fetch(`${API}/repos/`),
fetch(`${API}/workstreams/workplan-index`),
apiFetch("/workstreams/"),
apiFetch("/tasks/?limit=2000"),
apiFetch("/topics/"),
apiFetch("/repos/"),
apiFetch("/workstreams/workplan-index"),
]);
ok = rw.ok && rt.ok && rto.ok && rr.ok;
if (ok) {
@@ -132,8 +141,9 @@ const wsChartState = (async function*() {
});
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {wsAll, ok};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -3,20 +3,21 @@ title: Interventions
---
```js
import {API, POLL} from "./components/config.js";
import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
// Live poll: all tasks (filtered client-side) + workstreams + topics
const interventionState = (async function*() {
let failures = 0;
while (true) {
let tasks = [], wsMap = {}, ok = false;
try {
const [rt, rw, rto, rr] = await Promise.all([
fetch(`${API}/tasks/?limit=500`),
fetch(`${API}/workstreams/`),
fetch(`${API}/topics/`),
fetch(`${API}/repos/`),
apiFetch("/tasks/?limit=500"),
apiFetch("/workstreams/"),
apiFetch("/topics/"),
apiFetch("/repos/"),
]);
ok = rt.ok && rw.ok && rto.ok && rr.ok;
if (ok) {
@@ -36,8 +37,9 @@ const interventionState = (async function*() {
}));
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {tasks, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -3,24 +3,26 @@ title: Progress
---
```js
import {API, POLL} from "./components/config.js";
import {POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
const progState = (async function*() {
let failures = 0;
while (true) {
let data = [], tokenEvents = [], ok = false;
try {
const [r1, r2] = await Promise.all([
fetch(`${API}/progress/?limit=500`),
fetch(`${API}/token-events/?limit=1000`),
apiFetch("/progress/?limit=500"),
apiFetch("/token-events/?limit=1000"),
]);
ok = r1.ok;
data = ok ? await r1.json() : [];
tokenEvents = r2.ok ? await r2.json() : [];
} catch {}
failures = ok ? 0 : failures + 1;
yield {data, tokenEvents, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -3,19 +3,20 @@ title: Tasks
---
```js
import {API, POLL} from "./components/config.js";
import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
const taskState = (async function*() {
let failures = 0;
while (true) {
let data = [], ok = false;
try {
const [rt, rw, rto, rr] = await Promise.all([
fetch(`${API}/tasks/?limit=500`),
fetch(`${API}/workstreams/`),
fetch(`${API}/topics/`),
fetch(`${API}/repos/`),
apiFetch("/tasks/?limit=500"),
apiFetch("/workstreams/"),
apiFetch("/topics/"),
apiFetch("/repos/"),
]);
ok = rt.ok && rw.ok && rto.ok && rr.ok;
if (ok) {
@@ -33,8 +34,9 @@ const taskState = (async function*() {
}));
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {data, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -3,19 +3,20 @@ title: Technical Debt
---
```js
import {API, POLL} from "./components/config.js";
import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
const tdState = (async function*() {
let failures = 0;
while (true) {
let data = [], ok = false;
try {
const [rt, rw, rto, rr] = await Promise.all([
fetch(`${API}/technical-debt/`),
fetch(`${API}/workstreams/`),
fetch(`${API}/topics/`),
fetch(`${API}/repos/`),
apiFetch("/technical-debt/"),
apiFetch("/workstreams/"),
apiFetch("/topics/"),
apiFetch("/repos/"),
]);
ok = rt.ok && rw.ok && rto.ok && rr.ok;
if (ok) {
@@ -36,8 +37,9 @@ const tdState = (async function*() {
});
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {data, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -3,23 +3,24 @@ title: Todo
---
```js
import {API, POLL} from "./components/config.js";
import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
const THIS_REPO = "the-custodian";
```
```js
// Live poll: tasks + workstreams + topics + contributions
const todoState = (async function*() {
let failures = 0;
while (true) {
let tasks = [], contribs = [], improvements = [], wsMap = {}, ok = false;
try {
const [rt, rw, rto, rr, rc, ri] = await Promise.all([
fetch(`${API}/tasks/?limit=500`),
fetch(`${API}/workstreams/`),
fetch(`${API}/topics/`),
fetch(`${API}/repos/`),
fetch(`${API}/contributions/`),
fetch(`${API}/technical-debt/?debt_type=dashboard-improvement`),
apiFetch("/tasks/?limit=500"),
apiFetch("/workstreams/"),
apiFetch("/topics/"),
apiFetch("/repos/"),
apiFetch("/contributions/"),
apiFetch("/technical-debt/?debt_type=dashboard-improvement"),
]);
ok = rt.ok && rw.ok && rto.ok && rr.ok && rc.ok;
if (ok) {
@@ -42,8 +43,9 @@ const todoState = (async function*() {
improvements = ri.ok ? (await ri.json()).filter(t => t.debt_type === "dashboard-improvement" && !CLOSED.has(t.status)) : [];
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {tasks, contribs, improvements, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -3,7 +3,7 @@ title: Token Cost
---
```js
import {API} from "./components/config.js";
import {apiFetch, pollDelay, sleep} from "./components/config.js";
import {refCell} from "./components/ref-cell.js";
const POLL = 60_000;
```
@@ -11,14 +11,15 @@ const POLL = 60_000;
```js
// Fetch token events, by-repo summary, workstreams, and tasks in parallel
const tokenState = (async function*() {
let failures = 0;
while (true) {
let byRepo = [], events = [], wsMap = {}, taskMap = {}, ok = false;
try {
const [r1, r2, r3, r4] = await Promise.all([
fetch(`${API}/token-events/by-repo/`),
fetch(`${API}/token-events/?limit=1000`),
fetch(`${API}/workstreams/`),
fetch(`${API}/tasks/`),
apiFetch("/token-events/by-repo/"),
apiFetch("/token-events/?limit=1000"),
apiFetch("/workstreams/"),
apiFetch("/tasks/"),
]);
ok = r1.ok && r2.ok;
if (ok) {
@@ -34,8 +35,9 @@ const tokenState = (async function*() {
for (const t of taskList) taskMap[t.id] = t;
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {byRepo, events, wsMap, taskMap, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL, failures}));
}
})();
```

View File

@@ -3,7 +3,7 @@ title: UI Feedback
---
```js
import {API, POLL} from "./components/config.js";
import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
@@ -47,10 +47,11 @@ function nextStep(current) {
```js
const feedbackState = (async function*() {
let failures = 0;
while (true) {
let data = [], ok = false;
try {
const r = await fetch(`${API}/technical-debt/?debt_type=dashboard-improvement`);
const r = await apiFetch("/technical-debt/?debt_type=dashboard-improvement");
ok = r.ok;
if (ok) {
const items = await r.json();
@@ -62,8 +63,9 @@ const feedbackState = (async function*() {
});
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {data, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -3,20 +3,21 @@ title: Workstreams
---
```js
import {API, POLL} from "./components/config.js";
import {API, POLL_HEAVY, apiFetch, pollDelay, sleep} from "./components/config.js";
```
```js
// Fetch workstreams + topics + summary (for dep graph) in parallel
const wsState = (async function*() {
let failures = 0;
while (true) {
let data = [], openWs = [], ok = false;
try {
const [rw, rt, rr, rs] = await Promise.all([
fetch(`${API}/workstreams/`),
fetch(`${API}/topics/`),
fetch(`${API}/repos/`),
fetch(`${API}/state/summary`),
apiFetch("/workstreams/"),
apiFetch("/topics/"),
apiFetch("/repos/"),
apiFetch("/state/summary", {timeout: 20_000}),
]);
ok = rw.ok && rt.ok && rr.ok && rs.ok;
if (ok) {
@@ -32,8 +33,9 @@ const wsState = (async function*() {
openWs = summary.open_workstreams ?? [];
}
} catch {}
failures = ok ? 0 : failures + 1;
yield {data, openWs, ok, ts: new Date()};
await new Promise(res => setTimeout(res, POLL));
await sleep(pollDelay({ok, base: POLL_HEAVY, failures}));
}
})();
```

View File

@@ -17,9 +17,19 @@ The compose file is `infra/docker-compose.yml`. Copy `.env.example` to `.env` an
## Periodic Repo Sync — systemd user timer
The custodian sync timer runs `fix-consistency-all` every 15 minutes, keeping
workplan file state in sync with the state-hub DB automatically (belt-and-suspenders
alongside the per-repo git post-commit hooks).
The custodian sync timer runs `consistency_check.py --remote --all` every 15
minutes, keeping workplan file state in sync with the state-hub DB automatically
(belt-and-suspenders alongside the per-repo git post-commit hooks).
The all-repo remote sweep has two built-in load guards:
- A nonblocking process lock at `/tmp/custodian-consistency-remote-all.lock`;
if a prior sweep is still active, the next timer run exits cleanly.
- A wall-clock budget, defaulting to 300 seconds. Remaining repos are skipped
once the budget is exhausted. Override with `--max-seconds N` or set
`CONSISTENCY_REMOTE_ALL_MAX_SECONDS`.
- Warn-only sweeps exit 0 in `--remote --all` mode so the systemd unit only
goes failed for hard consistency failures.
### Installed unit files
@@ -64,7 +74,7 @@ If systemd is not available, fall back to crontab:
```bash
# Crontab fallback (run crontab -e and add):
*/15 * * * * curl -sf http://127.0.0.1:8000/state/health && cd ~/the-custodian/state-hub && .venv/bin/python scripts/consistency_check.py --all --fix >> /tmp/custodian-sync.log 2>&1
*/15 * * * * curl -sf http://127.0.0.1:8000/state/health && cd ~/the-custodian/state-hub && .venv/bin/python scripts/consistency_check.py --remote --all >> /tmp/custodian-sync.log 2>&1
```
---

View File

@@ -38,11 +38,14 @@ Exit codes:
from __future__ import annotations
import argparse
import os
import json
import re
import socket
import subprocess
import sys
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
@@ -72,6 +75,7 @@ VALID_WP_STATUSES = {"active", "completed", "archived"}
VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"}
VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"}
VALID_DEP_RELATIONSHIPS = {"blocks", "starts_after", "informs", "soft_dependency"}
DEFAULT_REMOTE_ALL_MAX_SECONDS = int(os.environ.get("CONSISTENCY_REMOTE_ALL_MAX_SECONDS", "300"))
# Workplan files use task-style vocabulary ("done"); the DB workstream API uses
# "completed". This map translates file values to DB values before comparison
@@ -161,6 +165,37 @@ class ConsistencyReport:
return [i for i in self.issues if i.severity == "INFO"]
@contextmanager
def run_lock(name: str):
"""Hold a nonblocking process lock for long-running consistency modes."""
try:
import fcntl
except ImportError:
yield True
return
lock_path = Path(os.environ.get("CONSISTENCY_LOCK_DIR", "/tmp")) / f"custodian-{name}.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
handle = lock_path.open("w", encoding="utf-8")
try:
try:
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
yield False
return
handle.seek(0)
handle.truncate()
handle.write(f"{os.getpid()} {datetime.utcnow().isoformat()}Z\n")
handle.flush()
yield True
finally:
try:
fcntl.flock(handle, fcntl.LOCK_UN)
except OSError:
pass
handle.close()
# ---------------------------------------------------------------------------
# YAML / frontmatter parsing
# ---------------------------------------------------------------------------
@@ -1591,6 +1626,7 @@ def _report_needs_action(
def fix_all_remote(
api_base: str,
no_writeback: bool = False,
max_seconds: int = DEFAULT_REMOTE_ALL_MAX_SECONDS,
) -> list[ConsistencyReport]:
"""Pull-then-fix all registered repos that need attention.
@@ -1608,12 +1644,19 @@ def fix_all_remote(
print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr)
return []
started = time.monotonic()
reports: list[ConsistencyReport] = []
skipped_clean: list[str] = []
skipped_missing: list[str] = []
skipped_budget: list[str] = []
for repo in repos:
slug = repo["slug"]
if max_seconds > 0 and time.monotonic() - started > max_seconds:
skipped_budget.append(slug)
skipped_budget.extend(r.get("slug", "?") for r in repos[repos.index(repo) + 1:])
break
# Resolve path using the same priority as check_repo
path = resolve_repo_path(repo)
if not path or not Path(path).is_dir():
@@ -1646,7 +1689,12 @@ def fix_all_remote(
print(f" CLEAN (skipped): {', '.join(skipped_clean)}")
if skipped_missing:
print(f" NOT ON THIS HOST (skipped): {', '.join(skipped_missing)}")
if skipped_clean or skipped_missing:
if skipped_budget:
print(
f" BUDGET EXHAUSTED after {max_seconds}s (skipped): "
f"{', '.join(skipped_budget)}"
)
if skipped_clean or skipped_missing or skipped_budget:
print()
return reports
@@ -1803,6 +1851,9 @@ def main() -> None:
help="Pull each repo before fixing; when used with --all, skips repos "
"that are already clean (no actionable issues, not behind remote). "
"Implies --fix.")
parser.add_argument("--max-seconds", type=int, default=DEFAULT_REMOTE_ALL_MAX_SECONDS,
help="Wall-clock budget for --remote --all before remaining repos are skipped "
f"(default: {DEFAULT_REMOTE_ALL_MAX_SECONDS}; 0 disables)")
parser.add_argument("--no-writeback", action="store_true", dest="no_writeback",
help="Disable DB→file status writeback (C-15) while keeping other fixes")
parser.add_argument("--archive-closed", action="store_true",
@@ -1849,7 +1900,18 @@ def main() -> None:
reports[0].fixes_applied.extend(f"archive: {m}" for m in moved)
# --remote --all: smart pull+fix across all repos
elif args.remote and args.all:
reports = fix_all_remote(args.api_base, no_writeback=no_wb)
with run_lock("consistency-remote-all") as acquired:
if not acquired:
print(
"SKIP: another fix-consistency-remote --all run is already active",
file=sys.stderr,
)
sys.exit(0)
reports = fix_all_remote(
args.api_base,
no_writeback=no_wb,
max_seconds=args.max_seconds,
)
if not reports:
sys.exit(0)
else:
@@ -1915,6 +1977,8 @@ def main() -> None:
any_fail = any(r.failures for r in reports)
any_warn = any(r.warnings for r in reports)
if args.remote and args.all and not any_fail:
sys.exit(0)
sys.exit(1 if any_fail else 2 if any_warn else 0)

View File

@@ -104,6 +104,10 @@ ExecStart=… consistency_check.py --remote --all
2. Skips repos that are already clean (no issues, not behind, not ahead)
3. For repos needing action: `git pull --ff-only` first, then `fix_repo()` (which ends with T04 push)
It also holds `/tmp/custodian-consistency-remote-all.lock` for the duration of
the sweep and defaults to a 300-second wall-clock budget. These guards keep a
slow or stalled sweep from overlapping with the next 15-minute timer activation.
Previously `--all --fix` was used, which skipped the pull step and the clean-repo skip logic.
---