feat(STATE-WP-0064): add consistency sweep remote-all API endpoint

Expose POST /consistency/sweep/remote-all so activity-core can trigger
the workstation ADR-001 remote-all sweep via the bridge tunnel pattern.
Records consistency_sweep_remote_all progress events and documents the
cutover runbook while the local custodian-sync timer remains interim.
This commit is contained in:
2026-06-21 20:19:22 +02:00
parent 0fdebc6aa8
commit 5a7a6ef5ee
9 changed files with 599 additions and 50 deletions

View File

@@ -17,6 +17,7 @@ from api.routers import token_events
from api.routers import interface_changes
from api.routers import flows
from api.routers import recently_on_scope
from api.routers import consistency_sweep
from api.routers import reconciliation
from api.routers import execution
from api.routers import fabric
@@ -102,6 +103,7 @@ app.add_middleware(
app.include_router(domains.router)
app.include_router(recently_on_scope.hourly_router)
app.include_router(recently_on_scope.router)
app.include_router(consistency_sweep.router)
app.include_router(repos.router)
app.include_router(topics.router)
app.include_router(workstreams.router)

View File

@@ -0,0 +1,33 @@
from __future__ import annotations
import json
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session
from api.schemas.consistency_sweep import (
ConsistencySweepRemoteAllGenerate,
ConsistencySweepRemoteAllRun,
)
from api.services.consistency_sweep import run_remote_all_sweep
router = APIRouter(prefix="/consistency/sweep", tags=["consistency"])
@router.post(
"/remote-all",
response_model=ConsistencySweepRemoteAllRun,
status_code=status.HTTP_201_CREATED,
)
async def sweep_remote_all(
body: ConsistencySweepRemoteAllGenerate,
session: AsyncSession = Depends(get_session),
) -> ConsistencySweepRemoteAllRun:
try:
return await run_remote_all_sweep(session, max_seconds=body.max_seconds)
except json.JSONDecodeError as exc:
raise HTTPException(
status_code=500,
detail=f"Consistency sweep returned invalid JSON: {exc}",
) from exc

View File

@@ -0,0 +1,42 @@
from __future__ import annotations
import uuid
from datetime import datetime
from pydantic import BaseModel, Field
class ConsistencySweepIssueSummary(BaseModel):
fail: int = 0
warn: int = 0
info: int = 0
class ConsistencySweepRepoResult(BaseModel):
repo_slug: str
repo_path: str
result: str
summary: ConsistencySweepIssueSummary
fixes_applied: list[str] = Field(default_factory=list)
class ConsistencySweepRemoteAllGenerate(BaseModel):
max_seconds: int = Field(
default=300,
ge=0,
le=3600,
description="Wall-clock budget for the remote-all sweep (0 disables)",
)
class ConsistencySweepRemoteAllRun(BaseModel):
started_at: datetime
completed_at: datetime
max_seconds: int
exit_code: int
lock_skipped: bool
repos_processed: list[ConsistencySweepRepoResult] = Field(default_factory=list)
skipped_clean: list[str] = Field(default_factory=list)
skipped_missing: list[str] = Field(default_factory=list)
skipped_budget: list[str] = Field(default_factory=list)
progress_event_id: uuid.UUID | None = None

View File

@@ -0,0 +1,178 @@
from __future__ import annotations
import asyncio
import json
import re
import subprocess
import sys
import uuid
from datetime import UTC, datetime
from pathlib import Path
from sqlalchemy.ext.asyncio import AsyncSession
from api.config import settings
from api.models.progress_event import ProgressEvent
from api.schemas.consistency_sweep import (
ConsistencySweepIssueSummary,
ConsistencySweepRemoteAllRun,
ConsistencySweepRepoResult,
)
_LOCK_SKIP_MARKER = "another fix-consistency-remote --all run is already active"
_CLEAN_RE = re.compile(r"^\s*CLEAN \(skipped\):\s*(.+)$", re.MULTILINE)
_MISSING_RE = re.compile(r"^\s*NOT ON THIS HOST \(skipped\):\s*(.+)$", re.MULTILINE)
_BUDGET_RE = re.compile(
r"^\s*BUDGET EXHAUSTED after \d+s \(skipped\):\s*(.+)$",
re.MULTILINE,
)
def _script_path() -> Path:
return Path(__file__).parent.parent.parent / "scripts" / "consistency_check.py"
def _split_slug_list(value: str) -> list[str]:
return [part.strip() for part in value.split(",") if part.strip()]
def _parse_stderr(stderr: str) -> dict[str, list[str]]:
return {
"skipped_clean": _split_slug_list(_CLEAN_RE.search(stderr).group(1))
if _CLEAN_RE.search(stderr)
else [],
"skipped_missing": _split_slug_list(_MISSING_RE.search(stderr).group(1))
if _MISSING_RE.search(stderr)
else [],
"skipped_budget": _split_slug_list(_BUDGET_RE.search(stderr).group(1))
if _BUDGET_RE.search(stderr)
else [],
}
def _parse_stdout(stdout: str) -> list[ConsistencySweepRepoResult]:
text = stdout.strip()
if not text:
return []
payload = json.loads(text)
items = payload if isinstance(payload, list) else [payload]
results: list[ConsistencySweepRepoResult] = []
for item in items:
summary = item.get("summary") or {}
results.append(
ConsistencySweepRepoResult(
repo_slug=str(item.get("repo_slug") or ""),
repo_path=str(item.get("repo_path") or ""),
result=str(item.get("result") or "pass"),
summary=ConsistencySweepIssueSummary(
fail=int(summary.get("fail", 0)),
warn=int(summary.get("warn", 0)),
info=int(summary.get("info", 0)),
),
fixes_applied=list(item.get("fixes_applied") or []),
)
)
return results
async def run_remote_all_sweep(
session: AsyncSession,
*,
max_seconds: int,
) -> ConsistencySweepRemoteAllRun:
started_at = datetime.now(tz=UTC)
cmd = [
sys.executable,
str(_script_path()),
"--remote",
"--all",
"--json",
"--api-base",
settings.api_base,
"--max-seconds",
str(max_seconds),
]
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
)
completed_at = datetime.now(tz=UTC)
lock_skipped = _LOCK_SKIP_MARKER in result.stderr
stderr_meta = _parse_stderr(result.stderr)
repos_processed = [] if lock_skipped else _parse_stdout(result.stdout)
progress_event_id = await _log_sweep_progress(
session,
started_at=started_at,
completed_at=completed_at,
max_seconds=max_seconds,
exit_code=result.returncode,
lock_skipped=lock_skipped,
repos_processed=repos_processed,
**stderr_meta,
)
return ConsistencySweepRemoteAllRun(
started_at=started_at,
completed_at=completed_at,
max_seconds=max_seconds,
exit_code=result.returncode,
lock_skipped=lock_skipped,
repos_processed=repos_processed,
skipped_clean=stderr_meta["skipped_clean"],
skipped_missing=stderr_meta["skipped_missing"],
skipped_budget=stderr_meta["skipped_budget"],
progress_event_id=progress_event_id,
)
async def _log_sweep_progress(
session: AsyncSession,
*,
started_at: datetime,
completed_at: datetime,
max_seconds: int,
exit_code: int,
lock_skipped: bool,
repos_processed: list[ConsistencySweepRepoResult],
skipped_clean: list[str],
skipped_missing: list[str],
skipped_budget: list[str],
) -> uuid.UUID:
processed_count = len(repos_processed)
fail_count = sum(1 for repo in repos_processed if repo.result == "fail")
warn_count = sum(1 for repo in repos_processed if repo.result == "warn")
if lock_skipped:
summary = "State Hub consistency sweep skipped: prior remote-all run still active"
else:
summary = (
"State Hub consistency sweep completed: "
f"{processed_count} processed, {len(skipped_clean)} clean, "
f"{len(skipped_missing)} missing, {len(skipped_budget)} budget-skipped, "
f"{fail_count} failed, {warn_count} warned"
)
event = ProgressEvent(
event_type="consistency_sweep_remote_all",
summary=summary,
detail={
"started_at": _iso(started_at),
"completed_at": _iso(completed_at),
"max_seconds": max_seconds,
"exit_code": exit_code,
"lock_skipped": lock_skipped,
"repos_processed": [item.model_dump(mode="json") for item in repos_processed],
"skipped_clean": skipped_clean,
"skipped_missing": skipped_missing,
"skipped_budget": skipped_budget,
},
author="state-hub",
)
session.add(event)
await session.commit()
await session.refresh(event)
return event.id
def _iso(value: datetime) -> str:
return value.astimezone(UTC).isoformat().replace("+00:00", "Z")

View File

@@ -0,0 +1,105 @@
# State Hub Consistency Sweep Runbook
## Purpose
This runbook answers whether the 15-minute State Hub consistency sync ran
without relying on the local `custodian-sync.timer`.
The intended steady state after `STATE-WP-0064` cutover is:
- activity-core on Railiance01 owns the `*/15 * * * *` UTC schedule and
ActivityRun audit trail.
- State Hub on the workstation owns `scripts/consistency_check.py`, lock
semantics, reconciliation, and the `consistency_sweep_remote_all`
progress event.
- The local systemd timer is disabled after the parallel week passes.
## API Surface
Manual or cluster-triggered invocation:
```bash
curl -s -X POST http://127.0.0.1:8000/consistency/sweep/remote-all \
-H "Content-Type: application/json" \
-d '{"max_seconds": 300}' | python3 -m json.tool
```
From Railiance01 through the bridge tunnel, use the `STATE_HUB_URL`
configured for activity-core (for example the `actcore-state-hub-bridge`
service target).
## Schedule Check
From the activity-core host, confirm the definition is synced and the
Temporal schedule exists:
```bash
cd ~/activity-core
ACTIVITY_DEFINITION_DIRS=/home/worsch/the-custodian make sync-activity-definitions
make sync-schedules
```
Expected definition:
- name: `State Hub Consistency Sweep`
- trigger: `*/15 * * * *`
- timezone: `UTC`
- misfire policy: `skip`
- enabled: `false` until manual canary passes, then `true` after cutover
## Progress Event Check
Query State Hub for the latest sweep event:
```bash
curl -s "http://127.0.0.1:8000/progress/?event_type=consistency_sweep_remote_all&limit=5" \
| python3 -m json.tool
```
Healthy evidence includes:
- `lock_skipped: false` on normal runs
- `repos_processed` entries only for repos that needed action
- `skipped_clean`, `skipped_missing`, and `skipped_budget` metadata when
applicable
- `exit_code: 0` for warn-only remote-all sweeps
A `lock_skipped: true` response is normal when the local timer and the
cluster schedule overlap during the parallel week.
## ActivityRun Check
Query the activity-core database for the most recent run of the sweep
definition:
```sql
select
run_id,
fired_at,
scheduled_for,
context_snapshot->'consistency_sweep_remote_all' as sweep_result
from activity_runs
where definition_id = '7c4e9a12-8f3b-4d5e-9c6a-1b2d3e4f5a6b'
order by fired_at desc
limit 5;
```
## Manual Canary
Before enabling the cluster schedule:
1. Confirm `state-hub-railiance01` tunnel health from ops-bridge.
2. Trigger one manual ActivityRun or POST the API through the bridge URL.
3. Verify the progress event and ActivityRun context snapshot.
4. Confirm idempotence when the local timer also fires (lock skip is OK).
## Cutover
After one parallel week (`STATE-WP-0064-T03`):
```bash
systemctl --user disable --now custodian-sync.timer
```
Then enable the activity-core definition and treat the cluster schedule
as the sole primary runner.

View File

@@ -1,8 +1,9 @@
# State Hub Cron → activity-core ActivityDefinition Migration (Design Stub)
# State Hub Cron → activity-core ActivityDefinition Migration
> CUST-WP-0040 T04. **Design stub — not yet implemented.**
> Migration depends on activity-core WP-0003 reaching the
> "ActivityDefinition file ingestion + cron trigger executor" milestone.
> CUST-WP-0040 T04. **Partially implemented** as of `STATE-WP-0064`.
> The consistency sweep API surface and ActivityDefinition are landed;
> cluster cutover still requires manual canary, parallel week, and local
> timer retirement.
The state hub currently runs two recurring maintenance jobs and one
per-repo event hook. Once activity-core is ready, each becomes an
@@ -36,41 +37,30 @@ run them on a schedule.
## 2. Target ActivityDefinitions
### A. `state-hub-consistency-sweep`
### A. `state-hub-consistency-sweep` (implemented)
```yaml
# activity-definitions/state-hub-consistency-sweep.yaml
id: the-custodian.state-hub-consistency-sweep
description: |
Sweep all registered repos: pull, reconcile workplan files ↔ DB,
apply writeback (C-15), respect pull gate (C-16). Mirrors the
existing custodian-sync systemd timer.
trigger:
trigger_type: cron
cron_expression: "*/15 * * * *"
timezone: UTC
misfire_policy: skip # if a prior run is still active, skip
context:
- kind: http_get # confirm state-hub API is reachable
url: http://127.0.0.1:8000/state/health
bind: hub_health
rule:
when:
- "hub_health.status == 'ok'"
instruction:
kind: shell
cmd: >-
cd /home/worsch/state-hub &&
.venv/bin/python scripts/consistency_check.py --remote --all --max-seconds 300
on_failure: log_and_continue # warn-only sweeps must not page on transient failures
```
Landed in `the-custodian/activity-definitions/state-hub-consistency-sweep.md`
with `enabled: false` until canary and cutover.
Invocation path (matches the hourly RecentlyOnScope pattern):
- activity-core context query: `consistency_sweep_remote_all`
- State Hub endpoint: `POST /consistency/sweep/remote-all`
- payload: `{"max_seconds": 300}`
- progress event: `consistency_sweep_remote_all`
State Hub runs `scripts/consistency_check.py --remote --all --json` on the
workstation host. activity-core does **not** shell into the laptop repo
checkout from the cluster.
Operator runbook: [`docs/consistency-sweep-runbook.md`](consistency-sweep-runbook.md).
Notes:
- Replaces the `custodian-sync.service` + `custodian-sync.timer` pair.
- Replaces the `custodian-sync.service` + `custodian-sync.timer` pair
after parallel week and cutover.
- Lock semantics (`/tmp/custodian-consistency-remote-all.lock`) stay in
the script — activity-core just sets the cadence.
- Once active, `infra/README.md` is updated to instruct users to delete
the systemd timer.
- Local timer retirement is tracked in `STATE-WP-0064-T04`.
### B. `state-hub-stale-task-cleanup`
@@ -119,16 +109,16 @@ trigger:
## 3. Required context queries
Both A and B want to confirm the state hub is reachable before running.
A reusable context source should be added to activity-core for this:
Implemented for A:
- `consistency_sweep_remote_all``POST /consistency/sweep/remote-all`
with a 330s resolver timeout (sweep budget default 300s).
Still optional for B and future splits:
- `state-hub.health``GET /state/health``{status, db, ...}`
- (optional) `state-hub.repos``GET /repos/?status=active` for the
sweep's per-repo branching, if we later split A into one
ActivityDefinition per repo.
These belong to the state-hub adapter referenced in the workplan's
out-of-scope note ("/sbom/status context query endpoint" etc.).
- (optional) `state-hub.repos``GET /repos/?status=active` for per-repo
ActivityDefinitions if the monolithic sweep is split later.
---
@@ -140,8 +130,8 @@ out-of-scope note ("/sbom/status context query endpoint" etc.).
| activity-core shell instruction kind with on_failure semantics | activity-core | activity-core/`src/...` |
| state-hub adapter exposing `state-hub.health` as a context source | activity-core | activity-core/adapters/ |
Until these land, the state hub continues to schedule jobs via systemd
timer + cron entries.
Until B lands and A is cut over, the state hub continues to schedule the
consistency sweep via the local systemd timer.
---

View File

@@ -25,8 +25,13 @@ alongside the per-repo git post-commit hooks).
> **Interim local runner (STATE-WP-0063):** units must target the standalone
> repo at `/home/worsch/state-hub` and invoke consistency via
> `/home/worsch/.local/bin/uv run python …`. The pre-extraction path
> `/home/worsch/the-custodian/state-hub` is obsolete. Scheduling moves to
> Railiance01 activity-core in `STATE-WP-0064`.
> `/home/worsch/the-custodian/state-hub` is obsolete.
>
> **Cluster runner (STATE-WP-0064):** activity-core on Railiance01 can
> trigger the same sweep through `POST /consistency/sweep/remote-all` via
> the `consistency_sweep_remote_all` context query. Keep this local timer
> enabled during the parallel week; disable it after cutover per
> [`docs/consistency-sweep-runbook.md`](../docs/consistency-sweep-runbook.md).
The all-repo remote sweep has two built-in load guards:

View File

@@ -0,0 +1,182 @@
from __future__ import annotations
import json
from datetime import UTC, datetime
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from api.schemas.consistency_sweep import (
ConsistencySweepIssueSummary,
ConsistencySweepRemoteAllRun,
ConsistencySweepRepoResult,
)
from api.services import consistency_sweep as sweep_service
@pytest.mark.asyncio
async def test_remote_all_sweep_records_progress_and_parses_stderr(client, monkeypatch):
async def fake_run_remote_all_sweep(session, *, max_seconds: int):
return ConsistencySweepRemoteAllRun(
max_seconds=max_seconds,
exit_code=0,
lock_skipped=False,
repos_processed=[
ConsistencySweepRepoResult(
repo_slug="state-hub",
repo_path="/home/worsch/state-hub",
result="pass",
summary=ConsistencySweepIssueSummary(info=1),
fixes_applied=["pull: already up to date"],
)
],
skipped_clean=["demo-service"],
skipped_missing=["remote-only"],
skipped_budget=[],
progress_event_id=None,
started_at=datetime(2026, 6, 21, 12, 0, tzinfo=UTC),
completed_at=datetime(2026, 6, 21, 12, 1, tzinfo=UTC),
)
monkeypatch.setattr(
"api.routers.consistency_sweep.run_remote_all_sweep",
AsyncMock(side_effect=fake_run_remote_all_sweep),
)
response = await client.post("/consistency/sweep/remote-all", json={"max_seconds": 300})
assert response.status_code == 201, response.text
body = response.json()
assert body["exit_code"] == 0
assert body["lock_skipped"] is False
assert body["repos_processed"][0]["repo_slug"] == "state-hub"
assert body["skipped_clean"] == ["demo-service"]
assert body["skipped_missing"] == ["remote-only"]
@pytest.mark.asyncio
async def test_remote_all_sweep_lock_skip_is_idempotent(client, monkeypatch):
async def fake_run_remote_all_sweep(session, *, max_seconds: int):
return ConsistencySweepRemoteAllRun(
max_seconds=max_seconds,
exit_code=0,
lock_skipped=True,
repos_processed=[],
skipped_clean=[],
skipped_missing=[],
skipped_budget=[],
progress_event_id=None,
started_at=datetime(2026, 6, 21, 12, 0, tzinfo=UTC),
completed_at=datetime(2026, 6, 21, 12, 0, 1, tzinfo=UTC),
)
monkeypatch.setattr(
"api.routers.consistency_sweep.run_remote_all_sweep",
AsyncMock(side_effect=fake_run_remote_all_sweep),
)
response = await client.post("/consistency/sweep/remote-all", json={})
assert response.status_code == 201, response.text
assert response.json()["lock_skipped"] is True
assert response.json()["repos_processed"] == []
def test_parse_stderr_extracts_skip_lists():
stderr = (
" CLEAN (skipped): alpha, beta\n"
" NOT ON THIS HOST (skipped): gamma\n"
" BUDGET EXHAUSTED after 300s (skipped): delta, epsilon\n"
)
parsed = sweep_service._parse_stderr(stderr)
assert parsed == {
"skipped_clean": ["alpha", "beta"],
"skipped_missing": ["gamma"],
"skipped_budget": ["delta", "epsilon"],
}
def test_parse_stdout_handles_single_and_batch_payloads():
single = json.dumps(
{
"repo_slug": "state-hub",
"repo_path": "/home/worsch/state-hub",
"result": "warn",
"summary": {"fail": 0, "warn": 1, "info": 0},
"fixes_applied": [],
}
)
batch = json.dumps(
[
{
"repo_slug": "alpha",
"repo_path": "/tmp/alpha",
"result": "pass",
"summary": {"fail": 0, "warn": 0, "info": 0},
"fixes_applied": [],
},
{
"repo_slug": "beta",
"repo_path": "/tmp/beta",
"result": "fail",
"summary": {"fail": 1, "warn": 0, "info": 0},
"fixes_applied": [],
},
]
)
single_result = sweep_service._parse_stdout(single)
batch_result = sweep_service._parse_stdout(batch)
assert len(single_result) == 1
assert single_result[0].result == "warn"
assert [repo.repo_slug for repo in batch_result] == ["alpha", "beta"]
assert batch_result[1].result == "fail"
@pytest.mark.asyncio
async def test_run_remote_all_sweep_invokes_script_and_logs_progress(client, monkeypatch):
captured: dict[str, object] = {}
def fake_run(cmd, capture_output, text):
captured["cmd"] = cmd
return SimpleNamespace(
stdout=json.dumps(
[
{
"repo_slug": "state-hub",
"repo_path": "/home/worsch/state-hub",
"result": "pass",
"summary": {"fail": 0, "warn": 0, "info": 0},
"fixes_applied": [],
}
]
),
stderr=" CLEAN (skipped): quiet-repo\n",
returncode=0,
)
async def fake_to_thread(fn, *args, **kwargs):
return fn(*args, **kwargs)
monkeypatch.setattr(sweep_service.asyncio, "to_thread", fake_to_thread)
monkeypatch.setattr(sweep_service.subprocess, "run", fake_run)
response = await client.post("/consistency/sweep/remote-all", json={"max_seconds": 120})
assert response.status_code == 201, response.text
body = response.json()
assert "--remote" in captured["cmd"]
assert "--all" in captured["cmd"]
assert captured["cmd"][captured["cmd"].index("--max-seconds") + 1] == "120"
assert body["exit_code"] == 0
assert body["skipped_clean"] == ["quiet-repo"]
assert body["progress_event_id"] is not None
events = await client.get(
"/progress/",
params={"event_type": "consistency_sweep_remote_all"},
)
assert events.status_code == 200, events.text
assert events.json()[0]["detail"]["skipped_clean"] == ["quiet-repo"]

View File

@@ -4,7 +4,7 @@ type: workplan
title: "Move State Hub consistency sync to Railiance01 (activity-core)"
domain: custodian
repo: state-hub
status: ready
status: active
owner: codex
topic_slug: custodian
created: "2026-06-21"
@@ -64,7 +64,7 @@ Out of scope:
```task
id: STATE-WP-0064-T01
status: todo
status: done
priority: high
state_hub_task_id: "ecc0f846-e00f-4063-8ec1-f6ad630e9265"
```
@@ -80,6 +80,14 @@ from the draft in `docs/cron-migration.md` §2A, adjusting:
Sync definition to Railiance01 activity-core (projection manifest per
`hourly-recently-on-scope` precedent). Enable after manual canary.
Done 2026-06-21:
- State Hub `POST /consistency/sweep/remote-all` + progress event
`consistency_sweep_remote_all`
- ActivityDefinition in `the-custodian/activity-definitions/` (`enabled: false`)
- activity-core resolver query + k8s projection in `20-runtime.yaml`
- Uses API invocation pattern (not cluster shell into laptop repo)
## T2 — Manual canary on Railiance01
```task
@@ -136,7 +144,7 @@ complete. Update `docs/activity-core-delegation.md` cross-reference.
```task
id: STATE-WP-0064-T05
status: todo
status: progress
priority: low
state_hub_task_id: "270ed7dd-aa79-469d-a817-e3fa1e71be41"
```
@@ -147,4 +155,8 @@ state_hub_task_id: "270ed7dd-aa79-469d-a817-e3fa1e71be41"
note blockers cleared.
- Dashboard or AGENTS snippet: "State Hub consistency sync" terminology.
Mark workplan `finished` when cluster schedule is the sole primary runner.
Mark workplan `finished` when cluster schedule is the sole primary runner.
Progress 2026-06-21: `docs/consistency-sweep-runbook.md` added;
`infra/README.md` and `docs/cron-migration.md` updated for API + parallel
week. Final cutover wording deferred to T04.