generated from coulomb/repo-seed
Extract the JSON payload from mixed script output and document Railiance01 kubectl sync steps. Mark T02 done after cluster bridge and resolver canaries.
194 lines
6.1 KiB
Python
194 lines
6.1 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import uuid
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from api.config import settings
|
|
from api.models.progress_event import ProgressEvent
|
|
from api.schemas.consistency_sweep import (
|
|
ConsistencySweepIssueSummary,
|
|
ConsistencySweepRemoteAllRun,
|
|
ConsistencySweepRepoResult,
|
|
)
|
|
|
|
_LOCK_SKIP_MARKER = "another fix-consistency-remote --all run is already active"
|
|
_CLEAN_RE = re.compile(r"^\s*CLEAN \(skipped\):\s*(.+)$", re.MULTILINE)
|
|
_MISSING_RE = re.compile(r"^\s*NOT ON THIS HOST \(skipped\):\s*(.+)$", re.MULTILINE)
|
|
_BUDGET_RE = re.compile(
|
|
r"^\s*BUDGET EXHAUSTED after \d+s \(skipped\):\s*(.+)$",
|
|
re.MULTILINE,
|
|
)
|
|
|
|
|
|
def _script_path() -> Path:
|
|
return Path(__file__).parent.parent.parent / "scripts" / "consistency_check.py"
|
|
|
|
|
|
def _split_slug_list(value: str) -> list[str]:
|
|
return [part.strip() for part in value.split(",") if part.strip()]
|
|
|
|
|
|
def _parse_stderr(stderr: str) -> dict[str, list[str]]:
|
|
return {
|
|
"skipped_clean": _split_slug_list(_CLEAN_RE.search(stderr).group(1))
|
|
if _CLEAN_RE.search(stderr)
|
|
else [],
|
|
"skipped_missing": _split_slug_list(_MISSING_RE.search(stderr).group(1))
|
|
if _MISSING_RE.search(stderr)
|
|
else [],
|
|
"skipped_budget": _split_slug_list(_BUDGET_RE.search(stderr).group(1))
|
|
if _BUDGET_RE.search(stderr)
|
|
else [],
|
|
}
|
|
|
|
|
|
def _extract_json_payload(text: str) -> Any:
|
|
stripped = text.strip()
|
|
if not stripped:
|
|
return []
|
|
decoder = json.JSONDecoder()
|
|
for index, char in enumerate(stripped):
|
|
if char not in "{[":
|
|
continue
|
|
try:
|
|
payload, _end = decoder.raw_decode(stripped, index)
|
|
return payload
|
|
except json.JSONDecodeError:
|
|
continue
|
|
raise json.JSONDecodeError("No JSON payload found", stripped, 0)
|
|
|
|
|
|
def _parse_stdout(stdout: str) -> list[ConsistencySweepRepoResult]:
|
|
text = stdout.strip()
|
|
if not text:
|
|
return []
|
|
payload = _extract_json_payload(text)
|
|
items = payload if isinstance(payload, list) else [payload]
|
|
results: list[ConsistencySweepRepoResult] = []
|
|
for item in items:
|
|
summary = item.get("summary") or {}
|
|
results.append(
|
|
ConsistencySweepRepoResult(
|
|
repo_slug=str(item.get("repo_slug") or ""),
|
|
repo_path=str(item.get("repo_path") or ""),
|
|
result=str(item.get("result") or "pass"),
|
|
summary=ConsistencySweepIssueSummary(
|
|
fail=int(summary.get("fail", 0)),
|
|
warn=int(summary.get("warn", 0)),
|
|
info=int(summary.get("info", 0)),
|
|
),
|
|
fixes_applied=list(item.get("fixes_applied") or []),
|
|
)
|
|
)
|
|
return results
|
|
|
|
|
|
async def run_remote_all_sweep(
|
|
session: AsyncSession,
|
|
*,
|
|
max_seconds: int,
|
|
) -> ConsistencySweepRemoteAllRun:
|
|
started_at = datetime.now(tz=UTC)
|
|
cmd = [
|
|
sys.executable,
|
|
str(_script_path()),
|
|
"--remote",
|
|
"--all",
|
|
"--json",
|
|
"--api-base",
|
|
settings.api_base,
|
|
"--max-seconds",
|
|
str(max_seconds),
|
|
]
|
|
result = await asyncio.to_thread(
|
|
subprocess.run,
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
completed_at = datetime.now(tz=UTC)
|
|
lock_skipped = _LOCK_SKIP_MARKER in result.stderr
|
|
stderr_meta = _parse_stderr(result.stderr)
|
|
repos_processed = [] if lock_skipped else _parse_stdout(result.stdout)
|
|
|
|
progress_event_id = await _log_sweep_progress(
|
|
session,
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
max_seconds=max_seconds,
|
|
exit_code=result.returncode,
|
|
lock_skipped=lock_skipped,
|
|
repos_processed=repos_processed,
|
|
**stderr_meta,
|
|
)
|
|
return ConsistencySweepRemoteAllRun(
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
max_seconds=max_seconds,
|
|
exit_code=result.returncode,
|
|
lock_skipped=lock_skipped,
|
|
repos_processed=repos_processed,
|
|
skipped_clean=stderr_meta["skipped_clean"],
|
|
skipped_missing=stderr_meta["skipped_missing"],
|
|
skipped_budget=stderr_meta["skipped_budget"],
|
|
progress_event_id=progress_event_id,
|
|
)
|
|
|
|
|
|
async def _log_sweep_progress(
|
|
session: AsyncSession,
|
|
*,
|
|
started_at: datetime,
|
|
completed_at: datetime,
|
|
max_seconds: int,
|
|
exit_code: int,
|
|
lock_skipped: bool,
|
|
repos_processed: list[ConsistencySweepRepoResult],
|
|
skipped_clean: list[str],
|
|
skipped_missing: list[str],
|
|
skipped_budget: list[str],
|
|
) -> uuid.UUID:
|
|
processed_count = len(repos_processed)
|
|
fail_count = sum(1 for repo in repos_processed if repo.result == "fail")
|
|
warn_count = sum(1 for repo in repos_processed if repo.result == "warn")
|
|
if lock_skipped:
|
|
summary = "State Hub consistency sweep skipped: prior remote-all run still active"
|
|
else:
|
|
summary = (
|
|
"State Hub consistency sweep completed: "
|
|
f"{processed_count} processed, {len(skipped_clean)} clean, "
|
|
f"{len(skipped_missing)} missing, {len(skipped_budget)} budget-skipped, "
|
|
f"{fail_count} failed, {warn_count} warned"
|
|
)
|
|
event = ProgressEvent(
|
|
event_type="consistency_sweep_remote_all",
|
|
summary=summary,
|
|
detail={
|
|
"started_at": _iso(started_at),
|
|
"completed_at": _iso(completed_at),
|
|
"max_seconds": max_seconds,
|
|
"exit_code": exit_code,
|
|
"lock_skipped": lock_skipped,
|
|
"repos_processed": [item.model_dump(mode="json") for item in repos_processed],
|
|
"skipped_clean": skipped_clean,
|
|
"skipped_missing": skipped_missing,
|
|
"skipped_budget": skipped_budget,
|
|
},
|
|
author="state-hub",
|
|
)
|
|
session.add(event)
|
|
await session.commit()
|
|
await session.refresh(event)
|
|
return event.id
|
|
|
|
|
|
def _iso(value: datetime) -> str:
|
|
return value.astimezone(UTC).isoformat().replace("+00:00", "Z") |