From 5a7a6ef5eec0f2eddb25a51ddd2c276e682969be Mon Sep 17 00:00:00 2001 From: tegwick Date: Sun, 21 Jun 2026 20:19:22 +0200 Subject: [PATCH] feat(STATE-WP-0064): add consistency sweep remote-all API endpoint Expose POST /consistency/sweep/remote-all so activity-core can trigger the workstation ADR-001 remote-all sweep via the bridge tunnel pattern. Records consistency_sweep_remote_all progress events and documents the cutover runbook while the local custodian-sync timer remains interim. --- api/main.py | 2 + api/routers/consistency_sweep.py | 33 ++++ api/schemas/consistency_sweep.py | 42 ++++ api/services/consistency_sweep.py | 178 +++++++++++++++++ docs/consistency-sweep-runbook.md | 105 ++++++++++ docs/cron-migration.md | 78 ++++---- infra/README.md | 9 +- tests/test_consistency_sweep.py | 182 ++++++++++++++++++ ...4-statehub-consistency-sync-railiance01.md | 20 +- 9 files changed, 599 insertions(+), 50 deletions(-) create mode 100644 api/routers/consistency_sweep.py create mode 100644 api/schemas/consistency_sweep.py create mode 100644 api/services/consistency_sweep.py create mode 100644 docs/consistency-sweep-runbook.md create mode 100644 tests/test_consistency_sweep.py diff --git a/api/main.py b/api/main.py index 4ac2d43..cf6f5d5 100644 --- a/api/main.py +++ b/api/main.py @@ -17,6 +17,7 @@ from api.routers import token_events from api.routers import interface_changes from api.routers import flows from api.routers import recently_on_scope +from api.routers import consistency_sweep from api.routers import reconciliation from api.routers import execution from api.routers import fabric @@ -102,6 +103,7 @@ app.add_middleware( app.include_router(domains.router) app.include_router(recently_on_scope.hourly_router) app.include_router(recently_on_scope.router) +app.include_router(consistency_sweep.router) app.include_router(repos.router) app.include_router(topics.router) app.include_router(workstreams.router) diff --git a/api/routers/consistency_sweep.py b/api/routers/consistency_sweep.py new file mode 100644 index 0000000..b0879f5 --- /dev/null +++ b/api/routers/consistency_sweep.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import json + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.ext.asyncio import AsyncSession + +from api.database import get_session +from api.schemas.consistency_sweep import ( + ConsistencySweepRemoteAllGenerate, + ConsistencySweepRemoteAllRun, +) +from api.services.consistency_sweep import run_remote_all_sweep + +router = APIRouter(prefix="/consistency/sweep", tags=["consistency"]) + + +@router.post( + "/remote-all", + response_model=ConsistencySweepRemoteAllRun, + status_code=status.HTTP_201_CREATED, +) +async def sweep_remote_all( + body: ConsistencySweepRemoteAllGenerate, + session: AsyncSession = Depends(get_session), +) -> ConsistencySweepRemoteAllRun: + try: + return await run_remote_all_sweep(session, max_seconds=body.max_seconds) + except json.JSONDecodeError as exc: + raise HTTPException( + status_code=500, + detail=f"Consistency sweep returned invalid JSON: {exc}", + ) from exc \ No newline at end of file diff --git a/api/schemas/consistency_sweep.py b/api/schemas/consistency_sweep.py new file mode 100644 index 0000000..b051176 --- /dev/null +++ b/api/schemas/consistency_sweep.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import uuid +from datetime import datetime + +from pydantic import BaseModel, Field + + +class ConsistencySweepIssueSummary(BaseModel): + fail: int = 0 + warn: int = 0 + info: int = 0 + + +class ConsistencySweepRepoResult(BaseModel): + repo_slug: str + repo_path: str + result: str + summary: ConsistencySweepIssueSummary + fixes_applied: list[str] = Field(default_factory=list) + + +class ConsistencySweepRemoteAllGenerate(BaseModel): + max_seconds: int = Field( + default=300, + ge=0, + le=3600, + description="Wall-clock budget for the remote-all sweep (0 disables)", + ) + + +class ConsistencySweepRemoteAllRun(BaseModel): + started_at: datetime + completed_at: datetime + max_seconds: int + exit_code: int + lock_skipped: bool + repos_processed: list[ConsistencySweepRepoResult] = Field(default_factory=list) + skipped_clean: list[str] = Field(default_factory=list) + skipped_missing: list[str] = Field(default_factory=list) + skipped_budget: list[str] = Field(default_factory=list) + progress_event_id: uuid.UUID | None = None \ No newline at end of file diff --git a/api/services/consistency_sweep.py b/api/services/consistency_sweep.py new file mode 100644 index 0000000..b5f9e69 --- /dev/null +++ b/api/services/consistency_sweep.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +import asyncio +import json +import re +import subprocess +import sys +import uuid +from datetime import UTC, datetime +from pathlib import Path + +from sqlalchemy.ext.asyncio import AsyncSession + +from api.config import settings +from api.models.progress_event import ProgressEvent +from api.schemas.consistency_sweep import ( + ConsistencySweepIssueSummary, + ConsistencySweepRemoteAllRun, + ConsistencySweepRepoResult, +) + +_LOCK_SKIP_MARKER = "another fix-consistency-remote --all run is already active" +_CLEAN_RE = re.compile(r"^\s*CLEAN \(skipped\):\s*(.+)$", re.MULTILINE) +_MISSING_RE = re.compile(r"^\s*NOT ON THIS HOST \(skipped\):\s*(.+)$", re.MULTILINE) +_BUDGET_RE = re.compile( + r"^\s*BUDGET EXHAUSTED after \d+s \(skipped\):\s*(.+)$", + re.MULTILINE, +) + + +def _script_path() -> Path: + return Path(__file__).parent.parent.parent / "scripts" / "consistency_check.py" + + +def _split_slug_list(value: str) -> list[str]: + return [part.strip() for part in value.split(",") if part.strip()] + + +def _parse_stderr(stderr: str) -> dict[str, list[str]]: + return { + "skipped_clean": _split_slug_list(_CLEAN_RE.search(stderr).group(1)) + if _CLEAN_RE.search(stderr) + else [], + "skipped_missing": _split_slug_list(_MISSING_RE.search(stderr).group(1)) + if _MISSING_RE.search(stderr) + else [], + "skipped_budget": _split_slug_list(_BUDGET_RE.search(stderr).group(1)) + if _BUDGET_RE.search(stderr) + else [], + } + + +def _parse_stdout(stdout: str) -> list[ConsistencySweepRepoResult]: + text = stdout.strip() + if not text: + return [] + payload = json.loads(text) + items = payload if isinstance(payload, list) else [payload] + results: list[ConsistencySweepRepoResult] = [] + for item in items: + summary = item.get("summary") or {} + results.append( + ConsistencySweepRepoResult( + repo_slug=str(item.get("repo_slug") or ""), + repo_path=str(item.get("repo_path") or ""), + result=str(item.get("result") or "pass"), + summary=ConsistencySweepIssueSummary( + fail=int(summary.get("fail", 0)), + warn=int(summary.get("warn", 0)), + info=int(summary.get("info", 0)), + ), + fixes_applied=list(item.get("fixes_applied") or []), + ) + ) + return results + + +async def run_remote_all_sweep( + session: AsyncSession, + *, + max_seconds: int, +) -> ConsistencySweepRemoteAllRun: + started_at = datetime.now(tz=UTC) + cmd = [ + sys.executable, + str(_script_path()), + "--remote", + "--all", + "--json", + "--api-base", + settings.api_base, + "--max-seconds", + str(max_seconds), + ] + result = await asyncio.to_thread( + subprocess.run, + cmd, + capture_output=True, + text=True, + ) + completed_at = datetime.now(tz=UTC) + lock_skipped = _LOCK_SKIP_MARKER in result.stderr + stderr_meta = _parse_stderr(result.stderr) + repos_processed = [] if lock_skipped else _parse_stdout(result.stdout) + + progress_event_id = await _log_sweep_progress( + session, + started_at=started_at, + completed_at=completed_at, + max_seconds=max_seconds, + exit_code=result.returncode, + lock_skipped=lock_skipped, + repos_processed=repos_processed, + **stderr_meta, + ) + return ConsistencySweepRemoteAllRun( + started_at=started_at, + completed_at=completed_at, + max_seconds=max_seconds, + exit_code=result.returncode, + lock_skipped=lock_skipped, + repos_processed=repos_processed, + skipped_clean=stderr_meta["skipped_clean"], + skipped_missing=stderr_meta["skipped_missing"], + skipped_budget=stderr_meta["skipped_budget"], + progress_event_id=progress_event_id, + ) + + +async def _log_sweep_progress( + session: AsyncSession, + *, + started_at: datetime, + completed_at: datetime, + max_seconds: int, + exit_code: int, + lock_skipped: bool, + repos_processed: list[ConsistencySweepRepoResult], + skipped_clean: list[str], + skipped_missing: list[str], + skipped_budget: list[str], +) -> uuid.UUID: + processed_count = len(repos_processed) + fail_count = sum(1 for repo in repos_processed if repo.result == "fail") + warn_count = sum(1 for repo in repos_processed if repo.result == "warn") + if lock_skipped: + summary = "State Hub consistency sweep skipped: prior remote-all run still active" + else: + summary = ( + "State Hub consistency sweep completed: " + f"{processed_count} processed, {len(skipped_clean)} clean, " + f"{len(skipped_missing)} missing, {len(skipped_budget)} budget-skipped, " + f"{fail_count} failed, {warn_count} warned" + ) + event = ProgressEvent( + event_type="consistency_sweep_remote_all", + summary=summary, + detail={ + "started_at": _iso(started_at), + "completed_at": _iso(completed_at), + "max_seconds": max_seconds, + "exit_code": exit_code, + "lock_skipped": lock_skipped, + "repos_processed": [item.model_dump(mode="json") for item in repos_processed], + "skipped_clean": skipped_clean, + "skipped_missing": skipped_missing, + "skipped_budget": skipped_budget, + }, + author="state-hub", + ) + session.add(event) + await session.commit() + await session.refresh(event) + return event.id + + +def _iso(value: datetime) -> str: + return value.astimezone(UTC).isoformat().replace("+00:00", "Z") \ No newline at end of file diff --git a/docs/consistency-sweep-runbook.md b/docs/consistency-sweep-runbook.md new file mode 100644 index 0000000..f7974e0 --- /dev/null +++ b/docs/consistency-sweep-runbook.md @@ -0,0 +1,105 @@ +# State Hub Consistency Sweep Runbook + +## Purpose + +This runbook answers whether the 15-minute State Hub consistency sync ran +without relying on the local `custodian-sync.timer`. + +The intended steady state after `STATE-WP-0064` cutover is: + +- activity-core on Railiance01 owns the `*/15 * * * *` UTC schedule and + ActivityRun audit trail. +- State Hub on the workstation owns `scripts/consistency_check.py`, lock + semantics, reconciliation, and the `consistency_sweep_remote_all` + progress event. +- The local systemd timer is disabled after the parallel week passes. + +## API Surface + +Manual or cluster-triggered invocation: + +```bash +curl -s -X POST http://127.0.0.1:8000/consistency/sweep/remote-all \ + -H "Content-Type: application/json" \ + -d '{"max_seconds": 300}' | python3 -m json.tool +``` + +From Railiance01 through the bridge tunnel, use the `STATE_HUB_URL` +configured for activity-core (for example the `actcore-state-hub-bridge` +service target). + +## Schedule Check + +From the activity-core host, confirm the definition is synced and the +Temporal schedule exists: + +```bash +cd ~/activity-core +ACTIVITY_DEFINITION_DIRS=/home/worsch/the-custodian make sync-activity-definitions +make sync-schedules +``` + +Expected definition: + +- name: `State Hub Consistency Sweep` +- trigger: `*/15 * * * *` +- timezone: `UTC` +- misfire policy: `skip` +- enabled: `false` until manual canary passes, then `true` after cutover + +## Progress Event Check + +Query State Hub for the latest sweep event: + +```bash +curl -s "http://127.0.0.1:8000/progress/?event_type=consistency_sweep_remote_all&limit=5" \ + | python3 -m json.tool +``` + +Healthy evidence includes: + +- `lock_skipped: false` on normal runs +- `repos_processed` entries only for repos that needed action +- `skipped_clean`, `skipped_missing`, and `skipped_budget` metadata when + applicable +- `exit_code: 0` for warn-only remote-all sweeps + +A `lock_skipped: true` response is normal when the local timer and the +cluster schedule overlap during the parallel week. + +## ActivityRun Check + +Query the activity-core database for the most recent run of the sweep +definition: + +```sql +select + run_id, + fired_at, + scheduled_for, + context_snapshot->'consistency_sweep_remote_all' as sweep_result +from activity_runs +where definition_id = '7c4e9a12-8f3b-4d5e-9c6a-1b2d3e4f5a6b' +order by fired_at desc +limit 5; +``` + +## Manual Canary + +Before enabling the cluster schedule: + +1. Confirm `state-hub-railiance01` tunnel health from ops-bridge. +2. Trigger one manual ActivityRun or POST the API through the bridge URL. +3. Verify the progress event and ActivityRun context snapshot. +4. Confirm idempotence when the local timer also fires (lock skip is OK). + +## Cutover + +After one parallel week (`STATE-WP-0064-T03`): + +```bash +systemctl --user disable --now custodian-sync.timer +``` + +Then enable the activity-core definition and treat the cluster schedule +as the sole primary runner. \ No newline at end of file diff --git a/docs/cron-migration.md b/docs/cron-migration.md index e0fc58d..d45ffd3 100644 --- a/docs/cron-migration.md +++ b/docs/cron-migration.md @@ -1,8 +1,9 @@ -# State Hub Cron → activity-core ActivityDefinition Migration (Design Stub) +# State Hub Cron → activity-core ActivityDefinition Migration -> CUST-WP-0040 T04. **Design stub — not yet implemented.** -> Migration depends on activity-core WP-0003 reaching the -> "ActivityDefinition file ingestion + cron trigger executor" milestone. +> CUST-WP-0040 T04. **Partially implemented** as of `STATE-WP-0064`. +> The consistency sweep API surface and ActivityDefinition are landed; +> cluster cutover still requires manual canary, parallel week, and local +> timer retirement. The state hub currently runs two recurring maintenance jobs and one per-repo event hook. Once activity-core is ready, each becomes an @@ -36,41 +37,30 @@ run them on a schedule. ## 2. Target ActivityDefinitions -### A. `state-hub-consistency-sweep` +### A. `state-hub-consistency-sweep` (implemented) -```yaml -# activity-definitions/state-hub-consistency-sweep.yaml -id: the-custodian.state-hub-consistency-sweep -description: | - Sweep all registered repos: pull, reconcile workplan files ↔ DB, - apply writeback (C-15), respect pull gate (C-16). Mirrors the - existing custodian-sync systemd timer. -trigger: - trigger_type: cron - cron_expression: "*/15 * * * *" - timezone: UTC - misfire_policy: skip # if a prior run is still active, skip -context: - - kind: http_get # confirm state-hub API is reachable - url: http://127.0.0.1:8000/state/health - bind: hub_health -rule: - when: - - "hub_health.status == 'ok'" -instruction: - kind: shell - cmd: >- - cd /home/worsch/state-hub && - .venv/bin/python scripts/consistency_check.py --remote --all --max-seconds 300 - on_failure: log_and_continue # warn-only sweeps must not page on transient failures -``` +Landed in `the-custodian/activity-definitions/state-hub-consistency-sweep.md` +with `enabled: false` until canary and cutover. + +Invocation path (matches the hourly RecentlyOnScope pattern): + +- activity-core context query: `consistency_sweep_remote_all` +- State Hub endpoint: `POST /consistency/sweep/remote-all` +- payload: `{"max_seconds": 300}` +- progress event: `consistency_sweep_remote_all` + +State Hub runs `scripts/consistency_check.py --remote --all --json` on the +workstation host. activity-core does **not** shell into the laptop repo +checkout from the cluster. + +Operator runbook: [`docs/consistency-sweep-runbook.md`](consistency-sweep-runbook.md). Notes: -- Replaces the `custodian-sync.service` + `custodian-sync.timer` pair. +- Replaces the `custodian-sync.service` + `custodian-sync.timer` pair + after parallel week and cutover. - Lock semantics (`/tmp/custodian-consistency-remote-all.lock`) stay in the script — activity-core just sets the cadence. -- Once active, `infra/README.md` is updated to instruct users to delete - the systemd timer. +- Local timer retirement is tracked in `STATE-WP-0064-T04`. ### B. `state-hub-stale-task-cleanup` @@ -119,16 +109,16 @@ trigger: ## 3. Required context queries -Both A and B want to confirm the state hub is reachable before running. -A reusable context source should be added to activity-core for this: +Implemented for A: + +- `consistency_sweep_remote_all` — `POST /consistency/sweep/remote-all` + with a 330s resolver timeout (sweep budget default 300s). + +Still optional for B and future splits: - `state-hub.health` — `GET /state/health` → `{status, db, ...}` -- (optional) `state-hub.repos` — `GET /repos/?status=active` for the - sweep's per-repo branching, if we later split A into one - ActivityDefinition per repo. - -These belong to the state-hub adapter referenced in the workplan's -out-of-scope note ("/sbom/status context query endpoint" etc.). +- (optional) `state-hub.repos` — `GET /repos/?status=active` for per-repo + ActivityDefinitions if the monolithic sweep is split later. --- @@ -140,8 +130,8 @@ out-of-scope note ("/sbom/status context query endpoint" etc.). | activity-core shell instruction kind with on_failure semantics | activity-core | activity-core/`src/...` | | state-hub adapter exposing `state-hub.health` as a context source | activity-core | activity-core/adapters/ | -Until these land, the state hub continues to schedule jobs via systemd -timer + cron entries. +Until B lands and A is cut over, the state hub continues to schedule the +consistency sweep via the local systemd timer. --- diff --git a/infra/README.md b/infra/README.md index 8308c14..9c90008 100644 --- a/infra/README.md +++ b/infra/README.md @@ -25,8 +25,13 @@ alongside the per-repo git post-commit hooks). > **Interim local runner (STATE-WP-0063):** units must target the standalone > repo at `/home/worsch/state-hub` and invoke consistency via > `/home/worsch/.local/bin/uv run python …`. The pre-extraction path -> `/home/worsch/the-custodian/state-hub` is obsolete. Scheduling moves to -> Railiance01 activity-core in `STATE-WP-0064`. +> `/home/worsch/the-custodian/state-hub` is obsolete. +> +> **Cluster runner (STATE-WP-0064):** activity-core on Railiance01 can +> trigger the same sweep through `POST /consistency/sweep/remote-all` via +> the `consistency_sweep_remote_all` context query. Keep this local timer +> enabled during the parallel week; disable it after cutover per +> [`docs/consistency-sweep-runbook.md`](../docs/consistency-sweep-runbook.md). The all-repo remote sweep has two built-in load guards: diff --git a/tests/test_consistency_sweep.py b/tests/test_consistency_sweep.py new file mode 100644 index 0000000..c6958d4 --- /dev/null +++ b/tests/test_consistency_sweep.py @@ -0,0 +1,182 @@ +from __future__ import annotations + +import json +from datetime import UTC, datetime +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from api.schemas.consistency_sweep import ( + ConsistencySweepIssueSummary, + ConsistencySweepRemoteAllRun, + ConsistencySweepRepoResult, +) +from api.services import consistency_sweep as sweep_service + + +@pytest.mark.asyncio +async def test_remote_all_sweep_records_progress_and_parses_stderr(client, monkeypatch): + async def fake_run_remote_all_sweep(session, *, max_seconds: int): + return ConsistencySweepRemoteAllRun( + max_seconds=max_seconds, + exit_code=0, + lock_skipped=False, + repos_processed=[ + ConsistencySweepRepoResult( + repo_slug="state-hub", + repo_path="/home/worsch/state-hub", + result="pass", + summary=ConsistencySweepIssueSummary(info=1), + fixes_applied=["pull: already up to date"], + ) + ], + skipped_clean=["demo-service"], + skipped_missing=["remote-only"], + skipped_budget=[], + progress_event_id=None, + started_at=datetime(2026, 6, 21, 12, 0, tzinfo=UTC), + completed_at=datetime(2026, 6, 21, 12, 1, tzinfo=UTC), + ) + + monkeypatch.setattr( + "api.routers.consistency_sweep.run_remote_all_sweep", + AsyncMock(side_effect=fake_run_remote_all_sweep), + ) + + response = await client.post("/consistency/sweep/remote-all", json={"max_seconds": 300}) + + assert response.status_code == 201, response.text + body = response.json() + assert body["exit_code"] == 0 + assert body["lock_skipped"] is False + assert body["repos_processed"][0]["repo_slug"] == "state-hub" + assert body["skipped_clean"] == ["demo-service"] + assert body["skipped_missing"] == ["remote-only"] + + +@pytest.mark.asyncio +async def test_remote_all_sweep_lock_skip_is_idempotent(client, monkeypatch): + async def fake_run_remote_all_sweep(session, *, max_seconds: int): + return ConsistencySweepRemoteAllRun( + max_seconds=max_seconds, + exit_code=0, + lock_skipped=True, + repos_processed=[], + skipped_clean=[], + skipped_missing=[], + skipped_budget=[], + progress_event_id=None, + started_at=datetime(2026, 6, 21, 12, 0, tzinfo=UTC), + completed_at=datetime(2026, 6, 21, 12, 0, 1, tzinfo=UTC), + ) + + monkeypatch.setattr( + "api.routers.consistency_sweep.run_remote_all_sweep", + AsyncMock(side_effect=fake_run_remote_all_sweep), + ) + + response = await client.post("/consistency/sweep/remote-all", json={}) + + assert response.status_code == 201, response.text + assert response.json()["lock_skipped"] is True + assert response.json()["repos_processed"] == [] + + +def test_parse_stderr_extracts_skip_lists(): + stderr = ( + " CLEAN (skipped): alpha, beta\n" + " NOT ON THIS HOST (skipped): gamma\n" + " BUDGET EXHAUSTED after 300s (skipped): delta, epsilon\n" + ) + parsed = sweep_service._parse_stderr(stderr) + assert parsed == { + "skipped_clean": ["alpha", "beta"], + "skipped_missing": ["gamma"], + "skipped_budget": ["delta", "epsilon"], + } + + +def test_parse_stdout_handles_single_and_batch_payloads(): + single = json.dumps( + { + "repo_slug": "state-hub", + "repo_path": "/home/worsch/state-hub", + "result": "warn", + "summary": {"fail": 0, "warn": 1, "info": 0}, + "fixes_applied": [], + } + ) + batch = json.dumps( + [ + { + "repo_slug": "alpha", + "repo_path": "/tmp/alpha", + "result": "pass", + "summary": {"fail": 0, "warn": 0, "info": 0}, + "fixes_applied": [], + }, + { + "repo_slug": "beta", + "repo_path": "/tmp/beta", + "result": "fail", + "summary": {"fail": 1, "warn": 0, "info": 0}, + "fixes_applied": [], + }, + ] + ) + + single_result = sweep_service._parse_stdout(single) + batch_result = sweep_service._parse_stdout(batch) + + assert len(single_result) == 1 + assert single_result[0].result == "warn" + assert [repo.repo_slug for repo in batch_result] == ["alpha", "beta"] + assert batch_result[1].result == "fail" + + +@pytest.mark.asyncio +async def test_run_remote_all_sweep_invokes_script_and_logs_progress(client, monkeypatch): + captured: dict[str, object] = {} + + def fake_run(cmd, capture_output, text): + captured["cmd"] = cmd + return SimpleNamespace( + stdout=json.dumps( + [ + { + "repo_slug": "state-hub", + "repo_path": "/home/worsch/state-hub", + "result": "pass", + "summary": {"fail": 0, "warn": 0, "info": 0}, + "fixes_applied": [], + } + ] + ), + stderr=" CLEAN (skipped): quiet-repo\n", + returncode=0, + ) + + async def fake_to_thread(fn, *args, **kwargs): + return fn(*args, **kwargs) + + monkeypatch.setattr(sweep_service.asyncio, "to_thread", fake_to_thread) + monkeypatch.setattr(sweep_service.subprocess, "run", fake_run) + + response = await client.post("/consistency/sweep/remote-all", json={"max_seconds": 120}) + + assert response.status_code == 201, response.text + body = response.json() + assert "--remote" in captured["cmd"] + assert "--all" in captured["cmd"] + assert captured["cmd"][captured["cmd"].index("--max-seconds") + 1] == "120" + assert body["exit_code"] == 0 + assert body["skipped_clean"] == ["quiet-repo"] + assert body["progress_event_id"] is not None + + events = await client.get( + "/progress/", + params={"event_type": "consistency_sweep_remote_all"}, + ) + assert events.status_code == 200, events.text + assert events.json()[0]["detail"]["skipped_clean"] == ["quiet-repo"] \ No newline at end of file diff --git a/workplans/STATE-WP-0064-statehub-consistency-sync-railiance01.md b/workplans/STATE-WP-0064-statehub-consistency-sync-railiance01.md index a8d01f3..11314a5 100644 --- a/workplans/STATE-WP-0064-statehub-consistency-sync-railiance01.md +++ b/workplans/STATE-WP-0064-statehub-consistency-sync-railiance01.md @@ -4,7 +4,7 @@ type: workplan title: "Move State Hub consistency sync to Railiance01 (activity-core)" domain: custodian repo: state-hub -status: ready +status: active owner: codex topic_slug: custodian created: "2026-06-21" @@ -64,7 +64,7 @@ Out of scope: ```task id: STATE-WP-0064-T01 -status: todo +status: done priority: high state_hub_task_id: "ecc0f846-e00f-4063-8ec1-f6ad630e9265" ``` @@ -80,6 +80,14 @@ from the draft in `docs/cron-migration.md` §2A, adjusting: Sync definition to Railiance01 activity-core (projection manifest per `hourly-recently-on-scope` precedent). Enable after manual canary. +Done 2026-06-21: + +- State Hub `POST /consistency/sweep/remote-all` + progress event + `consistency_sweep_remote_all` +- ActivityDefinition in `the-custodian/activity-definitions/` (`enabled: false`) +- activity-core resolver query + k8s projection in `20-runtime.yaml` +- Uses API invocation pattern (not cluster shell into laptop repo) + ## T2 — Manual canary on Railiance01 ```task @@ -136,7 +144,7 @@ complete. Update `docs/activity-core-delegation.md` cross-reference. ```task id: STATE-WP-0064-T05 -status: todo +status: progress priority: low state_hub_task_id: "270ed7dd-aa79-469d-a817-e3fa1e71be41" ``` @@ -147,4 +155,8 @@ state_hub_task_id: "270ed7dd-aa79-469d-a817-e3fa1e71be41" note blockers cleared. - Dashboard or AGENTS snippet: "State Hub consistency sync" terminology. -Mark workplan `finished` when cluster schedule is the sole primary runner. \ No newline at end of file +Mark workplan `finished` when cluster schedule is the sole primary runner. + +Progress 2026-06-21: `docs/consistency-sweep-runbook.md` added; +`infra/README.md` and `docs/cron-migration.md` updated for API + parallel +week. Final cutover wording deferred to T04. \ No newline at end of file