#!/usr/bin/env python3 """Summarise parallel-week consistency sweep evidence by runner source.""" from __future__ import annotations import argparse import json import sys import urllib.error import urllib.request from collections import Counter, defaultdict from datetime import UTC, datetime, timedelta def _parse_ts(value: str) -> datetime: return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(UTC) def _fetch(api_base: str, path: str) -> list[dict]: with urllib.request.urlopen(f"{api_base.rstrip('/')}{path}") as response: payload = json.load(response) return payload if isinstance(payload, list) else [] def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--api-base", default="http://127.0.0.1:8000") parser.add_argument("--since-hours", type=int, default=24) parser.add_argument("--json", action="store_true", dest="as_json") args = parser.parse_args(argv) since = datetime.now(tz=UTC) - timedelta(hours=args.since_hours) try: events = _fetch(args.api_base, "/progress/?event_type=consistency_sweep_remote_all&limit=500") except urllib.error.URLError as exc: print(f"ERROR: could not reach State Hub API: {exc}", file=sys.stderr) return 1 recent = [ event for event in events if isinstance(event.get("created_at"), str) and _parse_ts(event["created_at"]) >= since ] by_source: dict[str, list[dict]] = defaultdict(list) for event in recent: detail = event.get("detail") or {} source = str(detail.get("source") or "unknown") by_source[source].append(detail) summary = { "since": since.isoformat().replace("+00:00", "Z"), "total_events": len(recent), "by_source": {}, } for source, details in sorted(by_source.items()): summary["by_source"][source] = { "events": len(details), "completed": sum(1 for detail in details if not detail.get("lock_skipped")), "lock_skipped": sum(1 for detail in details if detail.get("lock_skipped")), "hard_fail_exit": sum(1 for detail in details if detail.get("exit_code") == 1), "repos_processed": sum(len(detail.get("repos_processed") or []) for detail in details), "budget_skipped_repos": sum(len(detail.get("skipped_budget") or []) for detail in details), "exit_codes": dict(Counter(detail.get("exit_code") for detail in details)), } if args.as_json: print(json.dumps(summary, indent=2)) return 0 print(f"Consistency sweep parallel summary since {summary['since']}") print(f"Total progress events: {summary['total_events']}") for source, stats in summary["by_source"].items(): print(f"\n[{source}]") print(f" events: {stats['events']}") print(f" completed: {stats['completed']}") print(f" lock_skipped: {stats['lock_skipped']}") print(f" hard_fail_exit: {stats['hard_fail_exit']}") print(f" repos_processed: {stats['repos_processed']}") print(f" budget_skipped: {stats['budget_skipped_repos']}") print(f" exit_codes: {stats['exit_codes']}") return 0 if __name__ == "__main__": raise SystemExit(main())