diff --git a/api/routers/consistency_sweep.py b/api/routers/consistency_sweep.py index b0879f5..bd1d7e3 100644 --- a/api/routers/consistency_sweep.py +++ b/api/routers/consistency_sweep.py @@ -25,7 +25,11 @@ async def sweep_remote_all( session: AsyncSession = Depends(get_session), ) -> ConsistencySweepRemoteAllRun: try: - return await run_remote_all_sweep(session, max_seconds=body.max_seconds) + return await run_remote_all_sweep( + session, + max_seconds=body.max_seconds, + source=body.source, + ) except json.JSONDecodeError as exc: raise HTTPException( status_code=500, diff --git a/api/schemas/consistency_sweep.py b/api/schemas/consistency_sweep.py index b051176..083995c 100644 --- a/api/schemas/consistency_sweep.py +++ b/api/schemas/consistency_sweep.py @@ -27,12 +27,17 @@ class ConsistencySweepRemoteAllGenerate(BaseModel): le=3600, description="Wall-clock budget for the remote-all sweep (0 disables)", ) + source: str = Field( + default="api", + description="Runner label stored on progress events (local-timer, activity-core, api)", + ) class ConsistencySweepRemoteAllRun(BaseModel): started_at: datetime completed_at: datetime max_seconds: int + source: str exit_code: int lock_skipped: bool repos_processed: list[ConsistencySweepRepoResult] = Field(default_factory=list) diff --git a/api/services/consistency_sweep.py b/api/services/consistency_sweep.py index 7bafaf5..1943e36 100644 --- a/api/services/consistency_sweep.py +++ b/api/services/consistency_sweep.py @@ -8,6 +8,7 @@ import sys import uuid from datetime import UTC, datetime from pathlib import Path +from typing import Any from sqlalchemy.ext.asyncio import AsyncSession @@ -95,6 +96,7 @@ async def run_remote_all_sweep( session: AsyncSession, *, max_seconds: int, + source: str = "api", ) -> ConsistencySweepRemoteAllRun: started_at = datetime.now(tz=UTC) cmd = [ @@ -124,6 +126,7 @@ async def run_remote_all_sweep( started_at=started_at, completed_at=completed_at, max_seconds=max_seconds, + source=source, exit_code=result.returncode, lock_skipped=lock_skipped, repos_processed=repos_processed, @@ -133,6 +136,7 @@ async def run_remote_all_sweep( started_at=started_at, completed_at=completed_at, max_seconds=max_seconds, + source=source, exit_code=result.returncode, lock_skipped=lock_skipped, repos_processed=repos_processed, @@ -149,6 +153,7 @@ async def _log_sweep_progress( started_at: datetime, completed_at: datetime, max_seconds: int, + source: str, exit_code: int, lock_skipped: bool, repos_processed: list[ConsistencySweepRepoResult], @@ -175,6 +180,7 @@ async def _log_sweep_progress( "started_at": _iso(started_at), "completed_at": _iso(completed_at), "max_seconds": max_seconds, + "source": source, "exit_code": exit_code, "lock_skipped": lock_skipped, "repos_processed": [item.model_dump(mode="json") for item in repos_processed], diff --git a/docs/consistency-sweep-runbook.md b/docs/consistency-sweep-runbook.md index ca226d3..ca93790 100644 --- a/docs/consistency-sweep-runbook.md +++ b/docs/consistency-sweep-runbook.md @@ -65,7 +65,7 @@ Expected definition: - trigger: `*/15 * * * *` - timezone: `UTC` - misfire policy: `skip` -- enabled: `false` until manual canary passes, then `true` after cutover +- enabled: `true` during parallel week (T03); local timer retired after T04 ## Progress Event Check @@ -113,6 +113,27 @@ Before enabling the cluster schedule: 3. Verify the progress event and ActivityRun context snapshot. 4. Confirm idempotence when the local timer also fires (lock skip is OK). +## Parallel week observability (T03) + +Both runners call the same API and tag progress events with `detail.source`: + +| Source | Runner | +|--------|--------| +| `local-timer` | `custodian-sync.timer` on the workstation | +| `activity-core` | Railiance01 Temporal schedule | + +Summarise evidence: + +```bash +cd ~/state-hub +uv run python scripts/compare_consistency_sweep_parallel.py --since-hours 24 +``` + +Expect some `lock_skipped: true` events when both schedules overlap — that is +healthy idempotence, not duplicate work. + +Parallel window: **2026-06-21 → 2026-06-28** (review before T04 cutover). + ## Cutover After one parallel week (`STATE-WP-0064-T03`): @@ -121,5 +142,4 @@ After one parallel week (`STATE-WP-0064-T03`): systemctl --user disable --now custodian-sync.timer ``` -Then enable the activity-core definition and treat the cluster schedule -as the sole primary runner. \ No newline at end of file +The cluster definition stays enabled; disable only the local timer. \ No newline at end of file diff --git a/infra/README.md b/infra/README.md index 9c90008..7608a20 100644 --- a/infra/README.md +++ b/infra/README.md @@ -27,11 +27,11 @@ alongside the per-repo git post-commit hooks). > `/home/worsch/.local/bin/uv run python …`. The pre-extraction path > `/home/worsch/the-custodian/state-hub` is obsolete. > -> **Cluster runner (STATE-WP-0064):** activity-core on Railiance01 can -> trigger the same sweep through `POST /consistency/sweep/remote-all` via -> the `consistency_sweep_remote_all` context query. Keep this local timer -> enabled during the parallel week; disable it after cutover per -> [`docs/consistency-sweep-runbook.md`](../docs/consistency-sweep-runbook.md). +> **Cluster runner (STATE-WP-0064):** activity-core on Railiance01 runs the +> same sweep on `*/15 * * * *` UTC (parallel week started 2026-06-21). Both +> runners use `POST /consistency/sweep/remote-all` with `detail.source` +> tagging (`local-timer` vs `activity-core`). Disable this local timer after +> T04 cutover per [`docs/consistency-sweep-runbook.md`](../docs/consistency-sweep-runbook.md). The all-repo remote sweep has two built-in load guards: diff --git a/infra/systemd/custodian-sync.service b/infra/systemd/custodian-sync.service index 9a7d71a..1202b09 100644 --- a/infra/systemd/custodian-sync.service +++ b/infra/systemd/custodian-sync.service @@ -6,6 +6,6 @@ After=network.target Type=oneshot WorkingDirectory=/home/worsch/state-hub ExecStartPre=/usr/bin/curl -sf http://127.0.0.1:8000/state/health -ExecStart=/home/worsch/.local/bin/uv run python scripts/consistency_check.py --remote --all +ExecStart=/usr/bin/curl -sf -X POST http://127.0.0.1:8000/consistency/sweep/remote-all -H "Content-Type: application/json" -d '{"max_seconds":300,"source":"local-timer"}' StandardOutput=journal StandardError=journal \ No newline at end of file diff --git a/scripts/compare_consistency_sweep_parallel.py b/scripts/compare_consistency_sweep_parallel.py new file mode 100644 index 0000000..ffac239 --- /dev/null +++ b/scripts/compare_consistency_sweep_parallel.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +"""Summarise parallel-week consistency sweep evidence by runner source.""" + +from __future__ import annotations + +import argparse +import json +import sys +import urllib.error +import urllib.request +from collections import Counter, defaultdict +from datetime import UTC, datetime, timedelta + + +def _parse_ts(value: str) -> datetime: + return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC) + + +def _fetch(api_base: str, path: str) -> list[dict]: + with urllib.request.urlopen(f"{api_base.rstrip('/')}{path}") as response: + payload = json.load(response) + return payload if isinstance(payload, list) else [] + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--api-base", default="http://127.0.0.1:8000") + parser.add_argument("--since-hours", type=int, default=24) + parser.add_argument("--json", action="store_true", dest="as_json") + args = parser.parse_args(argv) + + since = datetime.now(tz=UTC) - timedelta(hours=args.since_hours) + try: + events = _fetch(args.api_base, "/progress/?event_type=consistency_sweep_remote_all&limit=500") + except urllib.error.URLError as exc: + print(f"ERROR: could not reach State Hub API: {exc}", file=sys.stderr) + return 1 + + recent = [ + event + for event in events + if isinstance(event.get("created_at"), str) + and _parse_ts(event["created_at"]) >= since + ] + + by_source: dict[str, list[dict]] = defaultdict(list) + for event in recent: + detail = event.get("detail") or {} + source = str(detail.get("source") or "unknown") + by_source[source].append(detail) + + summary = { + "since": since.isoformat().replace("+00:00", "Z"), + "total_events": len(recent), + "by_source": {}, + } + for source, details in sorted(by_source.items()): + summary["by_source"][source] = { + "events": len(details), + "completed": sum(1 for detail in details if not detail.get("lock_skipped")), + "lock_skipped": sum(1 for detail in details if detail.get("lock_skipped")), + "hard_fail_exit": sum(1 for detail in details if detail.get("exit_code") == 1), + "repos_processed": sum(len(detail.get("repos_processed") or []) for detail in details), + "budget_skipped_repos": sum(len(detail.get("skipped_budget") or []) for detail in details), + "exit_codes": dict(Counter(detail.get("exit_code") for detail in details)), + } + + if args.as_json: + print(json.dumps(summary, indent=2)) + return 0 + + print(f"Consistency sweep parallel summary since {summary['since']}") + print(f"Total progress events: {summary['total_events']}") + for source, stats in summary["by_source"].items(): + print(f"\n[{source}]") + print(f" events: {stats['events']}") + print(f" completed: {stats['completed']}") + print(f" lock_skipped: {stats['lock_skipped']}") + print(f" hard_fail_exit: {stats['hard_fail_exit']}") + print(f" repos_processed: {stats['repos_processed']}") + print(f" budget_skipped: {stats['budget_skipped_repos']}") + print(f" exit_codes: {stats['exit_codes']}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) \ No newline at end of file diff --git a/tests/test_consistency_sweep.py b/tests/test_consistency_sweep.py index 490c000..f839f3f 100644 --- a/tests/test_consistency_sweep.py +++ b/tests/test_consistency_sweep.py @@ -17,9 +17,10 @@ from api.services import consistency_sweep as sweep_service @pytest.mark.asyncio async def test_remote_all_sweep_records_progress_and_parses_stderr(client, monkeypatch): - async def fake_run_remote_all_sweep(session, *, max_seconds: int): + async def fake_run_remote_all_sweep(session, *, max_seconds: int, source: str = "api"): return ConsistencySweepRemoteAllRun( max_seconds=max_seconds, + source=source, exit_code=0, lock_skipped=False, repos_processed=[ @@ -57,9 +58,10 @@ async def test_remote_all_sweep_records_progress_and_parses_stderr(client, monke @pytest.mark.asyncio async def test_remote_all_sweep_lock_skip_is_idempotent(client, monkeypatch): - async def fake_run_remote_all_sweep(session, *, max_seconds: int): + async def fake_run_remote_all_sweep(session, *, max_seconds: int, source: str = "api"): return ConsistencySweepRemoteAllRun( max_seconds=max_seconds, + source=source, exit_code=0, lock_skipped=True, repos_processed=[], @@ -175,10 +177,14 @@ async def test_run_remote_all_sweep_invokes_script_and_logs_progress(client, mon monkeypatch.setattr(sweep_service.asyncio, "to_thread", fake_to_thread) monkeypatch.setattr(sweep_service.subprocess, "run", fake_run) - response = await client.post("/consistency/sweep/remote-all", json={"max_seconds": 120}) + response = await client.post( + "/consistency/sweep/remote-all", + json={"max_seconds": 120, "source": "local-timer"}, + ) assert response.status_code == 201, response.text body = response.json() + assert body["source"] == "local-timer" assert "--remote" in captured["cmd"] assert "--all" in captured["cmd"] assert captured["cmd"][captured["cmd"].index("--max-seconds") + 1] == "120" @@ -191,4 +197,5 @@ async def test_run_remote_all_sweep_invokes_script_and_logs_progress(client, mon params={"event_type": "consistency_sweep_remote_all"}, ) assert events.status_code == 200, events.text - assert events.json()[0]["detail"]["skipped_clean"] == ["quiet-repo"] \ No newline at end of file + assert events.json()[0]["detail"]["skipped_clean"] == ["quiet-repo"] + assert events.json()[0]["detail"]["source"] == "local-timer" \ No newline at end of file diff --git a/workplans/STATE-WP-0064-statehub-consistency-sync-railiance01.md b/workplans/STATE-WP-0064-statehub-consistency-sync-railiance01.md index f86f4d4..68764b0 100644 --- a/workplans/STATE-WP-0064-statehub-consistency-sync-railiance01.md +++ b/workplans/STATE-WP-0064-statehub-consistency-sync-railiance01.md @@ -9,6 +9,7 @@ owner: codex topic_slug: custodian created: "2026-06-21" updated: "2026-06-21" +parallel_week_end: "2026-06-28" state_hub_workstream_id: "669d810a-53f4-448b-a0c1-a6543daa7c44" --- @@ -120,7 +121,7 @@ Done 2026-06-21: ```task id: STATE-WP-0064-T03 -status: todo +status: progress priority: medium state_hub_task_id: "8abb31ad-2f03-4aa7-889e-e60c3c39f1f8" ``` @@ -134,6 +135,16 @@ Run cluster schedule (`*/15 * * * *` UTC per design stub) alongside local Document comparison in a progress event or short runbook addendum. +Progress 2026-06-21 (parallel week started): + +- Enabled `state-hub-consistency-sweep` on Railiance01 (`enabled: true`, + Temporal schedule **upserted** — no longer paused). +- Unified both runners on `POST /consistency/sweep/remote-all` with + `detail.source` (`local-timer` vs `activity-core`). +- Local `custodian-sync.service` now calls the API (not direct script). +- Added `scripts/compare_consistency_sweep_parallel.py` and runbook §T3. +- Review window ends **2026-06-28**; then proceed to T04 cutover. + ## T4 — Retire local timer ```task @@ -171,4 +182,5 @@ Mark workplan `finished` when cluster schedule is the sole primary runner. Progress 2026-06-21: `docs/consistency-sweep-runbook.md` added; `infra/README.md` and `docs/cron-migration.md` updated for API + parallel -week. Final cutover wording deferred to T04. \ No newline at end of file +week. Parallel-week observability script landed; final cutover wording +deferred to T04. \ No newline at end of file