feat(STATE-WP-0064): start parallel week with source-tagged sweep runners

Tag consistency_sweep_remote_all progress events by source, route the local
timer through the API, add a parallel-week comparison script, and document
the 2026-06-21 to 2026-06-28 observation window for T03.
This commit is contained in:
2026-06-21 21:46:43 +02:00
parent 696b628142
commit ab14e77e77
9 changed files with 157 additions and 16 deletions

View File

@@ -25,7 +25,11 @@ async def sweep_remote_all(
session: AsyncSession = Depends(get_session),
) -> ConsistencySweepRemoteAllRun:
try:
return await run_remote_all_sweep(session, max_seconds=body.max_seconds)
return await run_remote_all_sweep(
session,
max_seconds=body.max_seconds,
source=body.source,
)
except json.JSONDecodeError as exc:
raise HTTPException(
status_code=500,

View File

@@ -27,12 +27,17 @@ class ConsistencySweepRemoteAllGenerate(BaseModel):
le=3600,
description="Wall-clock budget for the remote-all sweep (0 disables)",
)
source: str = Field(
default="api",
description="Runner label stored on progress events (local-timer, activity-core, api)",
)
class ConsistencySweepRemoteAllRun(BaseModel):
started_at: datetime
completed_at: datetime
max_seconds: int
source: str
exit_code: int
lock_skipped: bool
repos_processed: list[ConsistencySweepRepoResult] = Field(default_factory=list)

View File

@@ -8,6 +8,7 @@ import sys
import uuid
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
@@ -95,6 +96,7 @@ async def run_remote_all_sweep(
session: AsyncSession,
*,
max_seconds: int,
source: str = "api",
) -> ConsistencySweepRemoteAllRun:
started_at = datetime.now(tz=UTC)
cmd = [
@@ -124,6 +126,7 @@ async def run_remote_all_sweep(
started_at=started_at,
completed_at=completed_at,
max_seconds=max_seconds,
source=source,
exit_code=result.returncode,
lock_skipped=lock_skipped,
repos_processed=repos_processed,
@@ -133,6 +136,7 @@ async def run_remote_all_sweep(
started_at=started_at,
completed_at=completed_at,
max_seconds=max_seconds,
source=source,
exit_code=result.returncode,
lock_skipped=lock_skipped,
repos_processed=repos_processed,
@@ -149,6 +153,7 @@ async def _log_sweep_progress(
started_at: datetime,
completed_at: datetime,
max_seconds: int,
source: str,
exit_code: int,
lock_skipped: bool,
repos_processed: list[ConsistencySweepRepoResult],
@@ -175,6 +180,7 @@ async def _log_sweep_progress(
"started_at": _iso(started_at),
"completed_at": _iso(completed_at),
"max_seconds": max_seconds,
"source": source,
"exit_code": exit_code,
"lock_skipped": lock_skipped,
"repos_processed": [item.model_dump(mode="json") for item in repos_processed],

View File

@@ -65,7 +65,7 @@ Expected definition:
- trigger: `*/15 * * * *`
- timezone: `UTC`
- misfire policy: `skip`
- enabled: `false` until manual canary passes, then `true` after cutover
- enabled: `true` during parallel week (T03); local timer retired after T04
## Progress Event Check
@@ -113,6 +113,27 @@ Before enabling the cluster schedule:
3. Verify the progress event and ActivityRun context snapshot.
4. Confirm idempotence when the local timer also fires (lock skip is OK).
## Parallel week observability (T03)
Both runners call the same API and tag progress events with `detail.source`:
| Source | Runner |
|--------|--------|
| `local-timer` | `custodian-sync.timer` on the workstation |
| `activity-core` | Railiance01 Temporal schedule |
Summarise evidence:
```bash
cd ~/state-hub
uv run python scripts/compare_consistency_sweep_parallel.py --since-hours 24
```
Expect some `lock_skipped: true` events when both schedules overlap — that is
healthy idempotence, not duplicate work.
Parallel window: **2026-06-21 → 2026-06-28** (review before T04 cutover).
## Cutover
After one parallel week (`STATE-WP-0064-T03`):
@@ -121,5 +142,4 @@ After one parallel week (`STATE-WP-0064-T03`):
systemctl --user disable --now custodian-sync.timer
```
Then enable the activity-core definition and treat the cluster schedule
as the sole primary runner.
The cluster definition stays enabled; disable only the local timer.

View File

@@ -27,11 +27,11 @@ alongside the per-repo git post-commit hooks).
> `/home/worsch/.local/bin/uv run python …`. The pre-extraction path
> `/home/worsch/the-custodian/state-hub` is obsolete.
>
> **Cluster runner (STATE-WP-0064):** activity-core on Railiance01 can
> trigger the same sweep through `POST /consistency/sweep/remote-all` via
> the `consistency_sweep_remote_all` context query. Keep this local timer
> enabled during the parallel week; disable it after cutover per
> [`docs/consistency-sweep-runbook.md`](../docs/consistency-sweep-runbook.md).
> **Cluster runner (STATE-WP-0064):** activity-core on Railiance01 runs the
> same sweep on `*/15 * * * *` UTC (parallel week started 2026-06-21). Both
> runners use `POST /consistency/sweep/remote-all` with `detail.source`
> tagging (`local-timer` vs `activity-core`). Disable this local timer after
> T04 cutover per [`docs/consistency-sweep-runbook.md`](../docs/consistency-sweep-runbook.md).
The all-repo remote sweep has two built-in load guards:

View File

@@ -6,6 +6,6 @@ After=network.target
Type=oneshot
WorkingDirectory=/home/worsch/state-hub
ExecStartPre=/usr/bin/curl -sf http://127.0.0.1:8000/state/health
ExecStart=/home/worsch/.local/bin/uv run python scripts/consistency_check.py --remote --all
ExecStart=/usr/bin/curl -sf -X POST http://127.0.0.1:8000/consistency/sweep/remote-all -H "Content-Type: application/json" -d '{"max_seconds":300,"source":"local-timer"}'
StandardOutput=journal
StandardError=journal

View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
"""Summarise parallel-week consistency sweep evidence by runner source."""
from __future__ import annotations
import argparse
import json
import sys
import urllib.error
import urllib.request
from collections import Counter, defaultdict
from datetime import UTC, datetime, timedelta
def _parse_ts(value: str) -> datetime:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC)
def _fetch(api_base: str, path: str) -> list[dict]:
with urllib.request.urlopen(f"{api_base.rstrip('/')}{path}") as response:
payload = json.load(response)
return payload if isinstance(payload, list) else []
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--api-base", default="http://127.0.0.1:8000")
parser.add_argument("--since-hours", type=int, default=24)
parser.add_argument("--json", action="store_true", dest="as_json")
args = parser.parse_args(argv)
since = datetime.now(tz=UTC) - timedelta(hours=args.since_hours)
try:
events = _fetch(args.api_base, "/progress/?event_type=consistency_sweep_remote_all&limit=500")
except urllib.error.URLError as exc:
print(f"ERROR: could not reach State Hub API: {exc}", file=sys.stderr)
return 1
recent = [
event
for event in events
if isinstance(event.get("created_at"), str)
and _parse_ts(event["created_at"]) >= since
]
by_source: dict[str, list[dict]] = defaultdict(list)
for event in recent:
detail = event.get("detail") or {}
source = str(detail.get("source") or "unknown")
by_source[source].append(detail)
summary = {
"since": since.isoformat().replace("+00:00", "Z"),
"total_events": len(recent),
"by_source": {},
}
for source, details in sorted(by_source.items()):
summary["by_source"][source] = {
"events": len(details),
"completed": sum(1 for detail in details if not detail.get("lock_skipped")),
"lock_skipped": sum(1 for detail in details if detail.get("lock_skipped")),
"hard_fail_exit": sum(1 for detail in details if detail.get("exit_code") == 1),
"repos_processed": sum(len(detail.get("repos_processed") or []) for detail in details),
"budget_skipped_repos": sum(len(detail.get("skipped_budget") or []) for detail in details),
"exit_codes": dict(Counter(detail.get("exit_code") for detail in details)),
}
if args.as_json:
print(json.dumps(summary, indent=2))
return 0
print(f"Consistency sweep parallel summary since {summary['since']}")
print(f"Total progress events: {summary['total_events']}")
for source, stats in summary["by_source"].items():
print(f"\n[{source}]")
print(f" events: {stats['events']}")
print(f" completed: {stats['completed']}")
print(f" lock_skipped: {stats['lock_skipped']}")
print(f" hard_fail_exit: {stats['hard_fail_exit']}")
print(f" repos_processed: {stats['repos_processed']}")
print(f" budget_skipped: {stats['budget_skipped_repos']}")
print(f" exit_codes: {stats['exit_codes']}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -17,9 +17,10 @@ from api.services import consistency_sweep as sweep_service
@pytest.mark.asyncio
async def test_remote_all_sweep_records_progress_and_parses_stderr(client, monkeypatch):
async def fake_run_remote_all_sweep(session, *, max_seconds: int):
async def fake_run_remote_all_sweep(session, *, max_seconds: int, source: str = "api"):
return ConsistencySweepRemoteAllRun(
max_seconds=max_seconds,
source=source,
exit_code=0,
lock_skipped=False,
repos_processed=[
@@ -57,9 +58,10 @@ async def test_remote_all_sweep_records_progress_and_parses_stderr(client, monke
@pytest.mark.asyncio
async def test_remote_all_sweep_lock_skip_is_idempotent(client, monkeypatch):
async def fake_run_remote_all_sweep(session, *, max_seconds: int):
async def fake_run_remote_all_sweep(session, *, max_seconds: int, source: str = "api"):
return ConsistencySweepRemoteAllRun(
max_seconds=max_seconds,
source=source,
exit_code=0,
lock_skipped=True,
repos_processed=[],
@@ -175,10 +177,14 @@ async def test_run_remote_all_sweep_invokes_script_and_logs_progress(client, mon
monkeypatch.setattr(sweep_service.asyncio, "to_thread", fake_to_thread)
monkeypatch.setattr(sweep_service.subprocess, "run", fake_run)
response = await client.post("/consistency/sweep/remote-all", json={"max_seconds": 120})
response = await client.post(
"/consistency/sweep/remote-all",
json={"max_seconds": 120, "source": "local-timer"},
)
assert response.status_code == 201, response.text
body = response.json()
assert body["source"] == "local-timer"
assert "--remote" in captured["cmd"]
assert "--all" in captured["cmd"]
assert captured["cmd"][captured["cmd"].index("--max-seconds") + 1] == "120"
@@ -191,4 +197,5 @@ async def test_run_remote_all_sweep_invokes_script_and_logs_progress(client, mon
params={"event_type": "consistency_sweep_remote_all"},
)
assert events.status_code == 200, events.text
assert events.json()[0]["detail"]["skipped_clean"] == ["quiet-repo"]
assert events.json()[0]["detail"]["skipped_clean"] == ["quiet-repo"]
assert events.json()[0]["detail"]["source"] == "local-timer"

View File

@@ -9,6 +9,7 @@ owner: codex
topic_slug: custodian
created: "2026-06-21"
updated: "2026-06-21"
parallel_week_end: "2026-06-28"
state_hub_workstream_id: "669d810a-53f4-448b-a0c1-a6543daa7c44"
---
@@ -120,7 +121,7 @@ Done 2026-06-21:
```task
id: STATE-WP-0064-T03
status: todo
status: progress
priority: medium
state_hub_task_id: "8abb31ad-2f03-4aa7-889e-e60c3c39f1f8"
```
@@ -134,6 +135,16 @@ Run cluster schedule (`*/15 * * * *` UTC per design stub) alongside local
Document comparison in a progress event or short runbook addendum.
Progress 2026-06-21 (parallel week started):
- Enabled `state-hub-consistency-sweep` on Railiance01 (`enabled: true`,
Temporal schedule **upserted** — no longer paused).
- Unified both runners on `POST /consistency/sweep/remote-all` with
`detail.source` (`local-timer` vs `activity-core`).
- Local `custodian-sync.service` now calls the API (not direct script).
- Added `scripts/compare_consistency_sweep_parallel.py` and runbook §T3.
- Review window ends **2026-06-28**; then proceed to T04 cutover.
## T4 — Retire local timer
```task
@@ -171,4 +182,5 @@ Mark workplan `finished` when cluster schedule is the sole primary runner.
Progress 2026-06-21: `docs/consistency-sweep-runbook.md` added;
`infra/README.md` and `docs/cron-migration.md` updated for API + parallel
week. Final cutover wording deferred to T04.
week. Parallel-week observability script landed; final cutover wording
deferred to T04.