Files
state-hub/api/services/consistency_sweep.py
tegwick 39ed5459b9 finish(STATE-WP-0064): cut over scheduler and split sweep errors from failures
STATE-WP-0064 cutover (state-hub only):
- Retire local custodian-sync.timer; archive units under infra/systemd/archived/
- Mark workplan finished; update infra/README, cron-migration, runbook, AGENTS.md
- Point activity-core-delegation at the consistency-sweep runbook

Consistency engine — automation error vs assessment failure:
- C-00 is an automation error; C-01..C-23 assessment failures are recorded
  for follow-up but no longer fail --remote --all scheduled sweeps (exit 0)
- Skip workplans/README.md in the workplan glob (human index, not a workplan)
- Progress events and compare script expose automation_error and
  assessment_failures separately from exit_code
2026-06-22 01:20:59 +02:00

215 lines
7.0 KiB
Python

from __future__ import annotations
import asyncio
import json
import re
import subprocess
import sys
import uuid
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from api.config import settings
from api.models.progress_event import ProgressEvent
from api.schemas.consistency_sweep import (
ConsistencySweepIssueSummary,
ConsistencySweepRemoteAllRun,
ConsistencySweepRepoResult,
)
_LOCK_SKIP_MARKER = "another fix-consistency-remote --all run is already active"
_CLEAN_RE = re.compile(r"^\s*CLEAN \(skipped\):\s*(.+)$", re.MULTILINE)
_MISSING_RE = re.compile(r"^\s*NOT ON THIS HOST \(skipped\):\s*(.+)$", re.MULTILINE)
_BUDGET_RE = re.compile(
r"^\s*BUDGET EXHAUSTED after \d+s \(skipped\):\s*(.+)$",
re.MULTILINE,
)
def _script_path() -> Path:
return Path(__file__).parent.parent.parent / "scripts" / "consistency_check.py"
def _split_slug_list(value: str) -> list[str]:
return [part.strip() for part in value.split(",") if part.strip()]
def _parse_stderr(stderr: str) -> dict[str, list[str]]:
return {
"skipped_clean": _split_slug_list(_CLEAN_RE.search(stderr).group(1))
if _CLEAN_RE.search(stderr)
else [],
"skipped_missing": _split_slug_list(_MISSING_RE.search(stderr).group(1))
if _MISSING_RE.search(stderr)
else [],
"skipped_budget": _split_slug_list(_BUDGET_RE.search(stderr).group(1))
if _BUDGET_RE.search(stderr)
else [],
}
def _extract_json_payload(text: str) -> Any:
stripped = text.strip()
if not stripped:
return []
decoder = json.JSONDecoder()
for index, char in enumerate(stripped):
if char not in "{[":
continue
try:
payload, _end = decoder.raw_decode(stripped, index)
return payload
except json.JSONDecodeError:
continue
raise json.JSONDecodeError("No JSON payload found", stripped, 0)
def _parse_stdout(stdout: str) -> list[ConsistencySweepRepoResult]:
text = stdout.strip()
if not text:
return []
payload = _extract_json_payload(text)
items = payload if isinstance(payload, list) else [payload]
results: list[ConsistencySweepRepoResult] = []
for item in items:
summary = item.get("summary") or {}
results.append(
ConsistencySweepRepoResult(
repo_slug=str(item.get("repo_slug") or ""),
repo_path=str(item.get("repo_path") or ""),
result=str(item.get("result") or "pass"),
summary=ConsistencySweepIssueSummary(
fail=int(summary.get("fail", 0)),
automation_error=int(summary.get("automation_error", 0)),
warn=int(summary.get("warn", 0)),
info=int(summary.get("info", 0)),
),
fixes_applied=list(item.get("fixes_applied") or []),
)
)
return results
async def run_remote_all_sweep(
session: AsyncSession,
*,
max_seconds: int,
source: str = "api",
) -> ConsistencySweepRemoteAllRun:
started_at = datetime.now(tz=UTC)
cmd = [
sys.executable,
str(_script_path()),
"--remote",
"--all",
"--json",
"--api-base",
settings.api_base,
"--max-seconds",
str(max_seconds),
]
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
)
completed_at = datetime.now(tz=UTC)
lock_skipped = _LOCK_SKIP_MARKER in result.stderr
stderr_meta = _parse_stderr(result.stderr)
repos_processed = [] if lock_skipped else _parse_stdout(result.stdout)
automation_error = result.returncode != 0 and not lock_skipped
progress_event_id = await _log_sweep_progress(
session,
started_at=started_at,
completed_at=completed_at,
max_seconds=max_seconds,
source=source,
exit_code=result.returncode,
automation_error=automation_error,
lock_skipped=lock_skipped,
repos_processed=repos_processed,
**stderr_meta,
)
return ConsistencySweepRemoteAllRun(
started_at=started_at,
completed_at=completed_at,
max_seconds=max_seconds,
source=source,
exit_code=result.returncode,
automation_error=automation_error,
lock_skipped=lock_skipped,
repos_processed=repos_processed,
skipped_clean=stderr_meta["skipped_clean"],
skipped_missing=stderr_meta["skipped_missing"],
skipped_budget=stderr_meta["skipped_budget"],
progress_event_id=progress_event_id,
)
async def _log_sweep_progress(
session: AsyncSession,
*,
started_at: datetime,
completed_at: datetime,
max_seconds: int,
source: str,
exit_code: int,
automation_error: bool,
lock_skipped: bool,
repos_processed: list[ConsistencySweepRepoResult],
skipped_clean: list[str],
skipped_missing: list[str],
skipped_budget: list[str],
) -> uuid.UUID:
processed_count = len(repos_processed)
error_count = sum(1 for repo in repos_processed if repo.result == "error")
assessment_fail_count = sum(1 for repo in repos_processed if repo.result == "fail")
warn_count = sum(1 for repo in repos_processed if repo.result == "warn")
if lock_skipped:
summary = "State Hub consistency sweep skipped: prior remote-all run still active"
elif automation_error:
summary = (
"State Hub consistency sweep automation error: "
f"exit_code={exit_code}, {processed_count} repos partially processed"
)
else:
summary = (
"State Hub consistency sweep completed: "
f"{processed_count} processed, {len(skipped_clean)} clean, "
f"{len(skipped_missing)} missing, {len(skipped_budget)} budget-skipped, "
f"{assessment_fail_count} assessment-fail, {error_count} automation-error, "
f"{warn_count} warned"
)
event = ProgressEvent(
event_type="consistency_sweep_remote_all",
summary=summary,
detail={
"started_at": _iso(started_at),
"completed_at": _iso(completed_at),
"max_seconds": max_seconds,
"source": source,
"exit_code": exit_code,
"automation_error": automation_error,
"assessment_failures": assessment_fail_count,
"automation_errors": error_count,
"lock_skipped": lock_skipped,
"repos_processed": [item.model_dump(mode="json") for item in repos_processed],
"skipped_clean": skipped_clean,
"skipped_missing": skipped_missing,
"skipped_budget": skipped_budget,
},
author="state-hub",
)
session.add(event)
await session.commit()
await session.refresh(event)
return event.id
def _iso(value: datetime) -> str:
return value.astimezone(UTC).isoformat().replace("+00:00", "Z")