#!/usr/bin/env python3 """ cleanup_stale_tasks.py — cancel tasks that are still open in completed/archived workstreams. Run manually: python3 scripts/cleanup_stale_tasks.py Run via make: make cleanup-stale Cron example: 0 3 * * * cd ~/state-hub && .venv/bin/python scripts/cleanup_stale_tasks.py Exit codes: 0 — ran successfully (zero or more tasks cancelled) 1 — API unreachable or unexpected error """ import asyncio import json import os import sys import urllib.error import urllib.request from datetime import datetime, timezone # Make the api package importable when running as `python scripts/cleanup_stale_tasks.py` sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) try: from api.events import EventEnvelope, publish_event, shutdown_publisher except Exception: # pragma: no cover — event publishing is optional EventEnvelope = None # type: ignore[assignment] publish_event = None # type: ignore[assignment] shutdown_publisher = None # type: ignore[assignment] API = "http://127.0.0.1:8000" STALE_STATUSES = {"todo", "in_progress", "blocked"} CLOSED_WS_STATUS = {"completed", "archived"} def get(path: str) -> list | dict: with urllib.request.urlopen(f"{API}{path}") as r: return json.loads(r.read()) def _request(method: str, url: str, payload: dict) -> dict: """Send a JSON request, following 307/308 redirects with the same method.""" data = json.dumps(payload).encode() for _ in range(5): # max redirects req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"}, method=method, ) try: with urllib.request.urlopen(req) as r: return json.loads(r.read()) except urllib.error.HTTPError as e: if e.code in (307, 308): url = e.headers.get("Location", url) if not url.startswith("http"): url = API + url else: raise raise RuntimeError(f"Too many redirects for {url}") def patch(path: str, payload: dict) -> dict: return _request("PATCH", f"{API}{path}", payload) def post(path: str, payload: dict) -> dict: return _request("POST", f"{API}{path}", payload) def main() -> int: print(f"[cleanup-stale] {datetime.now(timezone.utc).isoformat(timespec='seconds')} — scanning…") try: tasks = get("/tasks/?limit=500") workstreams = get("/workstreams/") except urllib.error.URLError as e: print(f"[cleanup-stale] ERROR: API unreachable — {e}", file=sys.stderr) print("[cleanup-stale] Start the API with: cd ~/state-hub && make api", file=sys.stderr) return 1 closed_ws = {w["id"]: w for w in workstreams if w["status"] in CLOSED_WS_STATUS} stale = [ t for t in tasks if t["status"] in STALE_STATUSES and t["workstream_id"] in closed_ws ] if not stale: print("[cleanup-stale] Nothing to cancel — all open tasks belong to active workstreams.") return 0 print(f"[cleanup-stale] Found {len(stale)} stale task(s) in completed/archived workstreams:") cancelled = [] errors = [] nats_events: list[tuple[str, "EventEnvelope"]] = [] for t in stale: ws = closed_ws[t["workstream_id"]] reason = ( f"Workstream '{ws['title']}' is {ws['status']}. " f"Task was still '{t['status']}' at cleanup time. " f"See workplan closure review for actual outcome." ) try: patch( f"/tasks/{t['id']}/", {"status": "cancelled", "blocking_reason": reason}, ) cancelled.append(t) print(f" cancelled [{t['priority']:8}] {t['title'][:70]}") if EventEnvelope is not None: subject = "org.statehub.task.stale" nats_events.append(( subject, EventEnvelope.new( subject, attributes={ "task_id": t["id"], "workstream_id": t["workstream_id"], "workstream_status": ws["status"], "task_title": t["title"], "task_status_before": t["status"], }, ), )) except Exception as e: errors.append((t, str(e))) print(f" ERROR {t['title'][:60]} — {e}", file=sys.stderr) if nats_events and publish_event is not None and shutdown_publisher is not None: async def _flush_events() -> None: for subject, env in nats_events: await publish_event(subject, env) await shutdown_publisher() try: asyncio.run(_flush_events()) except Exception as e: # pragma: no cover — publishing is best-effort print(f"[cleanup-stale] WARNING: NATS publish failed — {e}", file=sys.stderr) # Emit a single progress event summarising the run if cancelled: by_ws: dict[str, list] = {} for t in cancelled: by_ws.setdefault(closed_ws[t["workstream_id"]]["title"], []).append(t["title"]) summary = ( f"Stale-task cleanup: cancelled {len(cancelled)} task(s) " f"across {len(by_ws)} completed workstream(s)" ) detail = { "cancelled_count": len(cancelled), "by_workstream": {ws: titles for ws, titles in by_ws.items()}, "error_count": len(errors), } try: post("/progress/", {"summary": summary, "event_type": "cleanup", "detail": detail}) print(f"[cleanup-stale] Progress event recorded.") except Exception as e: print(f"[cleanup-stale] WARNING: could not record progress event — {e}", file=sys.stderr) if errors: print(f"[cleanup-stale] Completed with {len(errors)} error(s).") return 1 print(f"[cleanup-stale] Done. {len(cancelled)} task(s) cancelled.") return 0 if __name__ == "__main__": sys.exit(main())