diff --git a/Makefile b/Makefile index a378dc4..dd2bdd5 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: install install-cli db db-tools migrate seed api dashboard check start clean register-project validate-adr add-domain rename-domain add-repo list-repos +.PHONY: install install-cli db db-tools migrate seed api dashboard check start clean register-project validate-adr add-domain rename-domain add-repo list-repos cleanup-stale COMPOSE = docker compose -f infra/docker-compose.yml --env-file .env @@ -89,5 +89,11 @@ validate-adr: @test -n "$(REPO)" || (echo "ERROR: REPO is required. Usage: make validate-adr REPO= [DOMAIN=]"; exit 1) uv run python scripts/validate_repo_adr.py "$(REPO)" $(if $(DOMAIN),--domain "$(DOMAIN)",) +## Cancel open tasks belonging to completed/archived workstreams. +## Safe to run at any time; also suitable for a daily cron job. +## Cron example: 0 3 * * * cd ~/the-custodian/state-hub && make cleanup-stale +cleanup-stale: + uv run python scripts/cleanup_stale_tasks.py + clean: $(COMPOSE) down -v diff --git a/scripts/cleanup_stale_tasks.py b/scripts/cleanup_stale_tasks.py new file mode 100644 index 0000000..6bca210 --- /dev/null +++ b/scripts/cleanup_stale_tasks.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +""" +cleanup_stale_tasks.py — cancel tasks that are still open in completed/archived workstreams. + +Run manually: python3 scripts/cleanup_stale_tasks.py +Run via make: make cleanup-stale +Cron example: 0 3 * * * cd ~/the-custodian/state-hub && .venv/bin/python scripts/cleanup_stale_tasks.py + +Exit codes: + 0 — ran successfully (zero or more tasks cancelled) + 1 — API unreachable or unexpected error +""" + +import json +import sys +import urllib.error +import urllib.request +from datetime import datetime, timezone + +API = "http://127.0.0.1:8000" +STALE_STATUSES = {"todo", "in_progress", "blocked"} +CLOSED_WS_STATUS = {"completed", "archived"} + + +def get(path: str) -> list | dict: + with urllib.request.urlopen(f"{API}{path}") as r: + return json.loads(r.read()) + + +def _request(method: str, url: str, payload: dict) -> dict: + """Send a JSON request, following 307/308 redirects with the same method.""" + data = json.dumps(payload).encode() + for _ in range(5): # max redirects + req = urllib.request.Request( + url, + data=data, + headers={"Content-Type": "application/json"}, + method=method, + ) + try: + with urllib.request.urlopen(req) as r: + return json.loads(r.read()) + except urllib.error.HTTPError as e: + if e.code in (307, 308): + url = e.headers.get("Location", url) + if not url.startswith("http"): + url = API + url + else: + raise + raise RuntimeError(f"Too many redirects for {url}") + + +def patch(path: str, payload: dict) -> dict: + return _request("PATCH", f"{API}{path}", payload) + + +def post(path: str, payload: dict) -> dict: + return _request("POST", f"{API}{path}", payload) + + +def main() -> int: + print(f"[cleanup-stale] {datetime.now(timezone.utc).isoformat(timespec='seconds')} — scanning…") + + try: + tasks = get("/tasks/?limit=500") + workstreams = get("/workstreams/") + except urllib.error.URLError as e: + print(f"[cleanup-stale] ERROR: API unreachable — {e}", file=sys.stderr) + print("[cleanup-stale] Start the API with: cd ~/the-custodian/state-hub && make api", file=sys.stderr) + return 1 + + closed_ws = {w["id"]: w for w in workstreams if w["status"] in CLOSED_WS_STATUS} + + stale = [ + t for t in tasks + if t["status"] in STALE_STATUSES + and t["workstream_id"] in closed_ws + ] + + if not stale: + print("[cleanup-stale] Nothing to cancel — all open tasks belong to active workstreams.") + return 0 + + print(f"[cleanup-stale] Found {len(stale)} stale task(s) in completed/archived workstreams:") + + cancelled = [] + errors = [] + + for t in stale: + ws = closed_ws[t["workstream_id"]] + reason = ( + f"Workstream '{ws['title']}' is {ws['status']}. " + f"Task was still '{t['status']}' at cleanup time. " + f"See workplan closure review for actual outcome." + ) + try: + patch( + f"/tasks/{t['id']}/", + {"status": "cancelled", "blocking_reason": reason}, + ) + cancelled.append(t) + print(f" cancelled [{t['priority']:8}] {t['title'][:70]}") + except Exception as e: + errors.append((t, str(e))) + print(f" ERROR {t['title'][:60]} — {e}", file=sys.stderr) + + # Emit a single progress event summarising the run + if cancelled: + by_ws: dict[str, list] = {} + for t in cancelled: + by_ws.setdefault(closed_ws[t["workstream_id"]]["title"], []).append(t["title"]) + + summary = ( + f"Stale-task cleanup: cancelled {len(cancelled)} task(s) " + f"across {len(by_ws)} completed workstream(s)" + ) + detail = { + "cancelled_count": len(cancelled), + "by_workstream": {ws: titles for ws, titles in by_ws.items()}, + "error_count": len(errors), + } + try: + post("/progress/", {"summary": summary, "event_type": "cleanup", "detail": detail}) + print(f"[cleanup-stale] Progress event recorded.") + except Exception as e: + print(f"[cleanup-stale] WARNING: could not record progress event — {e}", file=sys.stderr) + + if errors: + print(f"[cleanup-stale] Completed with {len(errors)} error(s).") + return 1 + + print(f"[cleanup-stale] Done. {len(cancelled)} task(s) cancelled.") + return 0 + + +if __name__ == "__main__": + sys.exit(main())