#!/usr/bin/env python3 """consistency_check.py — ADR-001 consistency checking engine. Runs bidirectional checks between workplan files in a registered repo and the state-hub DB. The file is always authoritative; the DB is the cache/index layer. Checks: C-01 workplans-dir FAIL No workplans/ directory missing C-02 workplan-parse FAIL No Workplan file cannot be parsed C-03 workstream-stale-ref FAIL No state_hub_workstream_id in file not in DB C-04 workstream-status-drift WARN Yes File status != DB status (file wins) C-05 workstream-title-drift WARN Yes File title != DB title (file wins) C-06 workstream-unlinked WARN Yes Workplan has no state_hub_workstream_id C-07 orphan-db-active FAIL No Active DB workstream, no backing file C-08 orphan-db-completed INFO No Completed/archived DB workstream, no file C-09 workstream-repo-mismatch FAIL Yes DB workstream repo_id != file location C-10 task-status-drift WARN Yes Task status differs between file and DB C-11 task-unlinked WARN Yes Task block has no state_hub_task_id C-12 orphan-db-task WARN No DB task in workstream has no file backing C-13 workstream-auto-complete WARN Yes All DB tasks done but workstream still active C-14 ghost-duplicate WARN No Active topic workstream with no repo_id matches a file-backed title — probable ghost from premature create_workstream() call C-15 task-db-ahead WARN Yes DB task status is ahead of file — regression prevented; writeback syncs file C-16 repo-behind-remote WARN No Local repo is behind remote tracking branch — --fix skipped to avoid clobbering remote progress C-17 repo-ahead-push-failed WARN No Local repo has unpushed commits and push failed — writes skipped to prevent runaway divergence Usage: python scripts/consistency_check.py --repo SLUG [--fix] [--no-writeback] [--json] [--api-base URL] python scripts/consistency_check.py --all [--fix] [--no-writeback] [--json] [--api-base URL] python scripts/consistency_check.py --here [PATH] [--fix] [--no-writeback] [--json] [--api-base URL] Exit codes: 0 — ok (no FAILs; only WARNs/INFOs) 1 — one or more FAILs present 2 — warn-only (no FAILs, but WARNs present) """ from __future__ import annotations import argparse import json import re import socket import subprocess import sys from dataclasses import dataclass, field from pathlib import Path from typing import Any try: import yaml as _yaml _HAS_YAML = True except ImportError: _HAS_YAML = False try: import httpx as _httpx _HAS_HTTPX = True except ImportError: _HAS_HTTPX = False # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL) _HEADING_RE = re.compile(r"^#{1,4}\s+(.+?)$", re.MULTILINE) VALID_WP_STATUSES = {"active", "completed", "archived"} VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"} VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"} # Workplan files use task-style vocabulary ("done"); the DB workstream API uses # "completed". This map translates file values to DB values before comparison # and before PATCHing, so "done" vs "completed" is never flagged as C-04 drift. FILE_TO_DB_WORKSTREAM_STATUS: dict[str, str] = { "done": "completed", "todo": "active", # workplan not yet started → active workstream in DB } # Ordinal ranking for task statuses used by the no-regress rule (T01/C-15). # blocked and in_progress share rank 1 — both are "in flight". STATUS_ORDER: dict[str, int] = { "todo": 0, "in_progress": 1, "blocked": 1, "done": 2, "cancelled": 2, } def normalise_workstream_status(status: str) -> str: """Translate a workplan file status value to its DB-canonical equivalent.""" return FILE_TO_DB_WORKSTREAM_STATUS.get(status, status) # --------------------------------------------------------------------------- # Data types # --------------------------------------------------------------------------- @dataclass class Issue: severity: str # FAIL | WARN | INFO check_id: str # C-01 through C-12 message: str file_path: str = "" db_id: str = "" file_value: str = "" db_value: str = "" fixable: bool = False _fix_context: dict = field(default_factory=dict, repr=False) @dataclass class ConsistencyReport: repo_slug: str repo_path: str issues: list[Issue] = field(default_factory=list) fixes_applied: list[str] = field(default_factory=list) def add(self, **kwargs) -> Issue: issue = Issue(**kwargs) self.issues.append(issue) return issue @property def failures(self) -> list[Issue]: return [i for i in self.issues if i.severity == "FAIL"] @property def warnings(self) -> list[Issue]: return [i for i in self.issues if i.severity == "WARN"] @property def infos(self) -> list[Issue]: return [i for i in self.issues if i.severity == "INFO"] # --------------------------------------------------------------------------- # YAML / frontmatter parsing # --------------------------------------------------------------------------- def _parse_yaml_block(raw: str) -> dict: """Parse a YAML string; fallback to simple key:value if pyyaml unavailable.""" if _HAS_YAML: try: return _yaml.safe_load(raw) or {} except _yaml.YAMLError: return {"_parse_error": True} result: dict = {} for line in raw.splitlines(): if ":" in line and not line.startswith(" "): k, _, v = line.partition(":") result[k.strip()] = v.strip().strip('"').strip("'") return result def parse_frontmatter(text: str) -> tuple[dict, str]: """Split YAML frontmatter from body. Returns ({}, text) if no frontmatter.""" if not text.startswith("---"): return {}, text parts = text.split("---", 2) if len(parts) < 3: return {}, text meta = _parse_yaml_block(parts[1].strip()) return meta, parts[2] def parse_task_blocks(body: str) -> list[dict]: """Extract task blocks, injecting title from the nearest preceding ### heading.""" headings = [(m.start(), m.group(1).strip()) for m in _HEADING_RE.finditer(body)] results = [] for m in _TASK_BLOCK_RE.finditer(body): task = _parse_yaml_block(m.group(1).strip()) if "title" not in task: prev = [(pos, text) for pos, text in headings if pos < m.start()] if prev: task["title"] = prev[-1][1] results.append(task) return results def get_tasks_from_workplan(meta: dict, body: str) -> list[dict]: """Get tasks from workplan — handles both ```task``` blocks and tasks: YAML list.""" blocks = parse_task_blocks(body) if blocks: return blocks # Fallback: tasks embedded as a YAML list in frontmatter (activity-core style) tasks = meta.get("tasks") if isinstance(tasks, list): return tasks return [] # --------------------------------------------------------------------------- # File update helpers # --------------------------------------------------------------------------- def _add_frontmatter_field(file_path: Path, key: str, value: str) -> None: """Insert key: "value" into frontmatter before the closing --- line.""" text = file_path.read_text(encoding="utf-8") lines = text.split("\n") close_idx = None for i, line in enumerate(lines[1:], 1): if line.strip() == "---": close_idx = i break if close_idx is None: return lines.insert(close_idx, f'{key}: "{value}"') file_path.write_text("\n".join(lines), encoding="utf-8") def _inject_task_id_into_block( file_path: Path, field_name: str, field_value: str, match_id: str ) -> bool: """Inject state_hub_task_id into the ```task``` block whose id == match_id.""" text = file_path.read_text(encoding="utf-8") def _replace(m: re.Match) -> str: block_content = m.group(1) task_meta = _parse_yaml_block(block_content.strip()) if str(task_meta.get("id", "")) != match_id: return m.group(0) existing_val = task_meta.get(field_name) if existing_val is not None and str(existing_val).strip() not in ("", "~", "null", "None", "none"): return m.group(0) # Replace existing null/~ line if present, otherwise append new_content = re.sub( rf"^{re.escape(field_name)}:.*$", f'{field_name}: "{field_value}"', block_content, flags=re.MULTILINE, ) if new_content == block_content: new_content = block_content.rstrip() + f"\n{field_name}: \"{field_value}\"" return f"```task\n{new_content}\n```" new_text = _TASK_BLOCK_RE.sub(_replace, text) if new_text != text: file_path.write_text(new_text, encoding="utf-8") return True return False def _inject_task_id_frontmatter_list( file_path: Path, field_value: str, match_id: str ) -> bool: """Inject state_hub_task_id into a task entry in frontmatter tasks: list.""" if not _HAS_YAML: return False import yaml text = file_path.read_text(encoding="utf-8") meta, body = parse_frontmatter(text) tasks = meta.get("tasks", []) changed = False for t in tasks: if str(t.get("id", "")) == match_id and "state_hub_task_id" not in t: t["state_hub_task_id"] = field_value changed = True if not changed: return False try: new_meta_str = yaml.dump(meta, allow_unicode=True, default_flow_style=False) parts = text.split("---", 2) if len(parts) < 3: return False new_text = f"---\n{new_meta_str}---{parts[2]}" file_path.write_text(new_text, encoding="utf-8") return True except Exception: return False # --------------------------------------------------------------------------- # API helpers # --------------------------------------------------------------------------- def _api_get(api_base: str, path: str, params: dict | None = None) -> Any: if not _HAS_HTTPX: return None # Only append trailing slash to the path component, not to query strings if "?" not in path and not path.endswith("/"): path += "/" try: with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c: filtered = {k: v for k, v in (params or {}).items() if v is not None} r = c.get(path, params=filtered if filtered else None) r.raise_for_status() return r.json() except Exception: return None def _api_patch(api_base: str, path: str, body: dict) -> Any: if not _HAS_HTTPX: return None if not path.endswith("/"): path += "/" try: with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c: r = c.patch(path, json=body) r.raise_for_status() return r.json() except Exception as exc: # Return a sentinel dict so callers can distinguish "API error" from "success" # and report it rather than silently dropping the fix. return {"_error": str(exc)} def _api_post(api_base: str, path: str, body: dict) -> Any: if not _HAS_HTTPX: return None if not path.endswith("/"): path += "/" try: with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c: r = c.post(path, json=body) r.raise_for_status() return r.json() except Exception: return None # --------------------------------------------------------------------------- # Core check engine # --------------------------------------------------------------------------- def resolve_repo_path(repo: dict, override: str | None = None) -> str: """Resolve the local filesystem path for a repo on the current machine. Priority: 1. Explicit --repo-path CLI override 2. host_paths[current_hostname] — per-machine path registered via POST /repos/{slug}/paths/ 3. local_path — legacy single-path field (backward compat) """ if override: return override hostname = socket.gethostname() host_paths = repo.get("host_paths") or {} return host_paths.get(hostname) or repo.get("local_path") or "" def _infer_slug_from_path(api_base: str, path: str) -> "tuple[str, str] | None": """Identify a registered repo from a local checkout path. Strategy (in order): 1. Root-commit fingerprint — ``git rev-list --max-parents=0 HEAD`` produces the same SHA-1 on every clone, independent of remote URL or checkout path. Looked up via ``GET /repos/by-fingerprint?hash=``. 2. Remote URL fallback — exact string match against ``remote_url`` field. Less reliable across machines (SSH aliases, HTTP vs HTTPS, etc.) but useful when fingerprint is not yet stored. Returns ``(slug, git_root)`` on success, ``None`` if no match found. """ try: git_root = subprocess.check_output( ["git", "rev-parse", "--show-toplevel"], cwd=path, stderr=subprocess.DEVNULL, text=True, ).strip() except (subprocess.CalledProcessError, FileNotFoundError, OSError): return None # Strategy 1: fingerprint lookup (most reliable) try: fingerprint = subprocess.check_output( ["git", "rev-list", "--max-parents=0", "HEAD"], cwd=git_root, stderr=subprocess.DEVNULL, text=True, ).strip() except (subprocess.CalledProcessError, FileNotFoundError, OSError): fingerprint = "" # Get local remote URL once — used for both disambiguation and fallback try: remote_url = subprocess.check_output( ["git", "remote", "get-url", "origin"], cwd=git_root, stderr=subprocess.DEVNULL, text=True, ).strip() except (subprocess.CalledProcessError, FileNotFoundError, OSError): remote_url = "" if fingerprint: # Try fingerprint + remote URL for precise match if remote_url: import urllib.parse as _up candidates = _api_get( api_base, f"/repos/by-fingerprint?hash={fingerprint}&remote_url={_up.quote(remote_url, safe='')}", ) if isinstance(candidates, list) and len(candidates) == 1: return candidates[0]["slug"], git_root # Fingerprint alone (works when repos don't share ancestry) candidates = _api_get(api_base, f"/repos/by-fingerprint?hash={fingerprint}") if isinstance(candidates, list): if len(candidates) == 1: return candidates[0]["slug"], git_root if len(candidates) > 1: # Disambiguate: prefer the repo whose slug appears in the git_root path for repo in candidates: if repo["slug"] in git_root: return repo["slug"], git_root # Can't disambiguate — return first match with a warning print( f" WARNING: {len(candidates)} repos share fingerprint {fingerprint[:12]}… " f"— using '{candidates[0]['slug']}'. " "Set remote_url on each repo for accurate matching.", file=sys.stderr, ) return candidates[0]["slug"], git_root # Strategy 2: remote URL exact match (fallback) if remote_url: import urllib.parse as _up repo = _api_get(api_base, f"/repos/by-remote?url={_up.quote(remote_url, safe='')}") if repo and isinstance(repo, dict) and "slug" in repo: return repo["slug"], git_root return None def check_repo(api_base: str, repo_slug: str, repo_path_override: str | None = None) -> ConsistencyReport: """Run all consistency checks for a registered repo.""" repo = _api_get(api_base, f"/repos/{repo_slug}") if repo is None: report = ConsistencyReport(repo_slug=repo_slug, repo_path="(unknown)") report.add( severity="FAIL", check_id="C-00", message=f"Repo '{repo_slug}' not found in state-hub DB", fixable=False, ) return report repo_id: str = repo["id"] repo_path: str = resolve_repo_path(repo, repo_path_override) report = ConsistencyReport(repo_slug=repo_slug, repo_path=repo_path) if not repo_path: report.add( severity="FAIL", check_id="C-00", message=f"Repo '{repo_slug}' has no local_path — cannot check workplan files", fixable=False, ) _check_orphan_db(api_base, repo_id, set(), report) return report repo_dir = Path(repo_path) workplans_dir = repo_dir / "workplans" # C-01: workplans/ directory missing if not workplans_dir.is_dir(): report.add( severity="FAIL", check_id="C-01", message=( "workplans/ directory missing — ADR-001 requires workplan files " "at /workplans/-.md" ), fixable=False, ) _check_orphan_db(api_base, repo_id, set(), report) return report # Parse workplan files workplan_infos: list[tuple[Path, dict, str]] = [] file_ws_ids: dict[str, tuple[Path, dict, str]] = {} # ws_id → (file, meta, body) for wp_file in sorted(workplans_dir.glob("*.md")): try: text = wp_file.read_text(encoding="utf-8") except OSError as e: report.add(severity="FAIL", check_id="C-02", message=f"Cannot read file: {e}", file_path=wp_file.name) continue if not text.startswith("---"): report.add(severity="FAIL", check_id="C-02", message="No YAML frontmatter found", file_path=wp_file.name) continue meta, body = parse_frontmatter(text) if not meta or meta.get("_parse_error"): report.add(severity="FAIL", check_id="C-02", message="YAML frontmatter parse error", file_path=wp_file.name) continue workplan_infos.append((wp_file, meta, body)) ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"') if ws_id: file_ws_ids[ws_id] = (wp_file, meta, body) # Per-workplan checks for wp_file, meta, body in workplan_infos: fname = wp_file.name ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"') file_status = str(meta.get("status", "")).strip() file_title = str(meta.get("title", "")).strip() file_domain = str(meta.get("domain", "")).strip() if not ws_id: # C-06: workplan not linked to any DB workstream report.add( severity="WARN", check_id="C-06", message="Workplan has no state_hub_workstream_id — not indexed in DB", file_path=fname, file_value=file_title or fname, fixable=True, _fix_context={ "wp_file": str(wp_file), "meta": meta, "body": body, "repo_id": repo_id, "domain": file_domain, }, ) continue ws = _api_get(api_base, f"/workstreams/{ws_id}") if ws is None: # C-03: stale workstream reference report.add( severity="FAIL", check_id="C-03", message=f"state_hub_workstream_id {ws_id[:8]}… not found in DB (stale reference)", file_path=fname, db_id=ws_id, fixable=False, ) continue # C-09: repo mismatch — file is here but DB says different repo db_repo_id: str = ws.get("repo_id") or "" if db_repo_id != repo_id: report.add( severity="FAIL", check_id="C-09", message=( f"Workstream '{ws.get('slug')}' repo_id in DB is " f"{db_repo_id[:8] if db_repo_id else 'None'} " f"but backing file lives in {repo_slug} ({repo_id[:8]}…)" ), file_path=fname, db_id=ws_id, file_value=repo_id, db_value=db_repo_id, fixable=True, _fix_context={"ws_id": ws_id, "correct_repo_id": repo_id}, ) # Continue to check drift even with mismatched repo # C-04: status drift — normalise file value before comparing so that # "done" (file) vs "completed" (DB) is not treated as drift. db_status = ws.get("status", "") normalised_file_status = normalise_workstream_status(file_status) if file_status and db_status and normalised_file_status != db_status: report.add( severity="WARN", check_id="C-04", message=( f"Status drift in '{ws.get('slug')}': " f"file={file_status!r} db={db_status!r} (file wins)" ), file_path=fname, db_id=ws_id, file_value=file_status, db_value=db_status, fixable=True, _fix_context={ "ws_id": ws_id, "field": "status", "value": normalised_file_status, # always send DB-valid value }, ) # C-05: title drift db_title = ws.get("title", "") if file_title and db_title and file_title != db_title: report.add( severity="WARN", check_id="C-05", message=( f"Title drift in '{ws.get('slug')}': " f"file={file_title!r} db={db_title!r} (file wins)" ), file_path=fname, db_id=ws_id, file_value=file_title, db_value=db_title, fixable=True, _fix_context={"ws_id": ws_id, "field": "title", "value": file_title}, ) # C-10, C-11, C-12: task-level checks tasks = get_tasks_from_workplan(meta, body) db_tasks = _api_get(api_base, "/tasks", {"workstream_id": ws_id}) db_task_by_id: dict[str, dict] = {} if isinstance(db_tasks, list): for t in db_tasks: db_task_by_id[t["id"]] = t file_task_sh_ids: set[str] = set() for task in tasks: if task.get("_parse_error"): continue t_id = str(task.get("id", "")).strip() _raw_sh = task.get("state_hub_task_id") t_sh_id = "" if _raw_sh is None else str(_raw_sh).strip().strip('"') if t_sh_id in ("~", "null", "None", "none"): t_sh_id = "" t_status = str(task.get("status", "")).strip() if t_sh_id: file_task_sh_ids.add(t_sh_id) db_task = _api_get(api_base, f"/tasks/{t_sh_id}") if db_task is None: report.add( severity="FAIL", check_id="C-03", message=f"state_hub_task_id {t_sh_id[:8]}… not found in DB", file_path=f"{fname}#{t_id}", db_id=t_sh_id, fixable=False, ) continue # C-10 / C-15: task status drift db_t_status = db_task.get("status", "") if t_status and db_t_status and t_status != db_t_status: db_rank = STATUS_ORDER.get(db_t_status, 0) file_rank = STATUS_ORDER.get(t_status, 0) if db_rank >= file_rank: # C-15: DB is already at the same rank or ahead — prevent # regression. Writeback syncs the file to match DB. report.add( severity="WARN", check_id="C-15", message=( f"DB task '{t_id}' is ahead of file " f"(db={db_t_status!r}, file={t_status!r}) " f"— regression prevented; writeback will sync file" ), file_path=f"{fname}#{t_id}", db_id=t_sh_id, file_value=t_status, db_value=db_t_status, fixable=True, _fix_context={ "task_id": t_sh_id, "wp_file": str(wp_file), "task_block_id": t_id, "db_status": db_t_status, }, ) else: # C-10: file is ahead — apply file→DB sync (normal drift) report.add( severity="WARN", check_id="C-10", message=( f"Task status drift '{t_id}': " f"file={t_status!r} db={db_t_status!r} (file wins)" ), file_path=f"{fname}#{t_id}", db_id=t_sh_id, file_value=t_status, db_value=db_t_status, fixable=True, _fix_context={"task_id": t_sh_id, "status": t_status}, ) elif t_id: # C-11: task exists in file but not linked to DB ws_status = ws.get("status", "") report.add( severity="WARN", check_id="C-11", message=f"Task '{t_id}' has no state_hub_task_id", file_path=f"{fname}#{t_id}", fixable=True, _fix_context={ "ws_id": ws_id, "ws_status": ws_status, "task": task, "wp_file": str(wp_file), "meta": meta, "body": body, }, ) # C-12: DB tasks with no file backing if isinstance(db_tasks, list): ws_status = ws.get("status", "") ws_finished = ws_status in ("completed", "archived") for db_t in db_tasks: if db_t["id"] not in file_task_sh_ids: db_t_status = db_t.get("status", "") open_task = db_t_status not in ("done", "cancelled") # Auto-cancel fixable when workstream is finished and task is open fixable = ws_finished and open_task report.add( severity="WARN", check_id="C-12", message=( f"DB task '{db_t.get('title', '')}' " f"(id={db_t['id'][:8]}…, status={db_t_status}) " f"in workstream '{ws.get('slug')}' has no file backing" ), db_id=db_t["id"], fixable=fixable, _fix_context={ "task_id": db_t["id"], "ws_finished": ws_finished, }, ) # C-13: all DB tasks done but workstream still active — worker forgot to close db_status = ws.get("status", "") if db_status == "active" and isinstance(db_tasks, list) and db_tasks: non_terminal = [ t for t in db_tasks if t.get("status") not in ("done", "cancelled") ] if not non_terminal: report.add( severity="WARN", check_id="C-13", message=( f"All {len(db_tasks)} DB tasks for '{ws.get('slug')}' are " f"done/cancelled but workstream status is still 'active' — " f"worker likely forgot update_workstream_status()" ), file_path=fname, db_id=ws_id, db_value="active", fixable=True, _fix_context={ "ws_id": ws_id, "field": "status", "value": "completed", }, ) # C-07 / C-08: orphan DB workstreams (have repo_id=this_repo but no backing file) _check_orphan_db(api_base, repo_id, set(file_ws_ids.keys()), report) # C-14: ghost duplicate — active workstream on same topic with no repo_id whose # title matches a file-backed workstream. Root cause: create_workstream() called # before the workplan file was written; fix-consistency then created a second # workstream from the file, leaving the first as an invisible orphan. _check_ghost_duplicates(api_base, workplan_infos, file_ws_ids, report) return report def _check_orphan_db( api_base: str, repo_id: str, file_ws_ids: set[str], report: ConsistencyReport ) -> None: """Flag DB workstreams with repo_id=this_repo that have no backing workplan file.""" all_ws = _api_get(api_base, "/workstreams", {"repo_id": repo_id}) if not isinstance(all_ws, list): return for ws in all_ws: ws_id = ws["id"] if ws_id in file_ws_ids: continue ws_status = ws.get("status", "") ws_slug = ws.get("slug", "") if ws_status == "active": report.add( severity="FAIL", check_id="C-07", message=( f"Active DB workstream '{ws_slug}' (id={ws_id[:8]}…) " f"has no backing workplan file — ADR-001 violation" ), db_id=ws_id, fixable=False, ) elif ws_status in ("completed", "archived"): report.add( severity="INFO", check_id="C-08", message=( f"Completed/archived DB workstream '{ws_slug}' " f"(id={ws_id[:8]}…, status={ws_status}) has no backing workplan file" ), db_id=ws_id, fixable=False, ) def _check_ghost_duplicates( api_base: str, workplan_infos: list[tuple], file_ws_ids: dict[str, tuple], report: ConsistencyReport, ) -> None: """C-14: detect active workstreams with repo_id=null whose title matches a file-backed workstream — these are ghosts created by premature create_workstream() calls before the workplan file existed. """ # Build lookup: normalised title → file-backed workstream id file_titles: dict[str, str] = {} for _, meta, _ in workplan_infos: ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"') title = str(meta.get("title", "")).strip().lower() if title and ws_id: file_titles[title] = ws_id if not file_titles: return # Gather topic_ids from all file-backed workstreams so we can query by topic topic_ids: set[str] = set() for ws_id in file_ws_ids: ws = _api_get(api_base, f"/workstreams/{ws_id}") if ws and ws.get("topic_id"): topic_ids.add(ws["topic_id"]) for topic_id in topic_ids: topic_ws = _api_get(api_base, "/workstreams", {"topic_id": topic_id, "status": "active"}) if not isinstance(topic_ws, list): continue for ws in topic_ws: ws_id = ws["id"] if ws_id in file_ws_ids: continue # legitimately linked if ws.get("repo_id"): continue # C-07 covers repo-scoped orphans ws_title = ws.get("title", "").strip().lower() if ws_title in file_titles: file_backed_id = file_titles[ws_title] report.add( severity="WARN", check_id="C-14", message=( f"Ghost duplicate: active workstream '{ws.get('slug')}' " f"(id={ws_id[:8]}…, repo_id=null) has same title as " f"file-backed workstream {file_backed_id[:8]}… — " f"likely created via create_workstream() before workplan file existed; " f"archive it" ), db_id=ws_id, fixable=False, ) # --------------------------------------------------------------------------- # Git sync (T02–T04: pull gate, writeback, push seal) — see repo_sync.py # --------------------------------------------------------------------------- # repo_sync.py owns the push-seal invariant and all git lifecycle primitives. # The aliases below keep internal call sites and existing tests unchanged. from repo_sync import ( # noqa: E402 (import after top-level imports intentional) count_local_ahead as _detect_ahead_of_remote, count_remote_ahead as _count_remote_ahead, pull_ff as _git_pull, push_ff as _git_push, ) def _detect_behind_remote(repo_path: str) -> bool: """True if remote has commits the local repo lacks (C-16 predicate). Delegates to repo_sync.count_remote_ahead, which fetches before counting. Returns False on any error so C-16 is never spuriously triggered. """ return _count_remote_ahead(repo_path) > 0 def _patch_task_status_in_file( file_path: Path, task_block_id: str, new_status: str ) -> bool: """Update the ``status:`` field inside a task block identified by its id. Only modifies lines inside a ```task … ``` fenced block whose ``id:`` matches *task_block_id*. Returns True if the file was changed. """ text = file_path.read_text(encoding="utf-8") def _replace(m: re.Match) -> str: # m.group(0) is the full ```task...``` block including fences. # m.group(1) is the inner YAML content (no fences). full_block = m.group(0) inner = m.group(1).strip() task_meta = _parse_yaml_block(inner) if str(task_meta.get("id", "")).strip() != task_block_id: return full_block # Replace the status line only, leave everything else untouched. return re.sub( r"^(status:\s*)\S+", rf"\g<1>{new_status}", full_block, flags=re.MULTILINE, ) new_text = _TASK_BLOCK_RE.sub(_replace, text) if new_text != text: file_path.write_text(new_text, encoding="utf-8") return True return False def _git_commit_writeback( repo_path: str, file_path: Path, changes: list[str] ) -> bool: """Stage *file_path* and commit with a standard writeback message. Returns True on success, False on any error (errors are logged to stderr but do not abort the consistency run). """ from datetime import date as _date summary = "\n".join(f" - {c}" for c in changes) msg = ( f"chore(consistency): sync task status from DB [auto]\n\n" f"Updated by fix-consistency on {_date.today().isoformat()}:\n" f"{summary}" ) import os as _os # Pass GIT_CUSTODIAN_SYNC=1 so the post-commit hook can detect it is # running from within a sync pass and exit early, preventing re-entrancy. sync_env = {**_os.environ, "GIT_CUSTODIAN_SYNC": "1"} try: subprocess.run( ["git", "-C", repo_path, "add", str(file_path)], check=True, capture_output=True, env=sync_env, ) subprocess.run( ["git", "-C", repo_path, "commit", "-m", msg], check=True, capture_output=True, env=sync_env, ) return True except subprocess.CalledProcessError as e: print( f"WARN: git commit failed for writeback: {e.stderr.decode().strip()}", file=sys.stderr, ) return False # --------------------------------------------------------------------------- # Worker orientation brief (.custodian-brief.md) # --------------------------------------------------------------------------- _BRIEF_HEADER = "" _TASK_STATUS_ICON = {"done": "✓", "cancelled": "✗", "in_progress": "►", "blocked": "!", "todo": "·"} _OPEN_STATUSES = {"todo", "in_progress", "blocked"} def _write_custodian_brief(api_base: str, repo_slug: str, repo_path: str) -> bool: """Generate .custodian-brief.md at the repo root and git-commit if changed. The brief gives any agent — including subagents without MCP access and workers on remote machines — instant orientation without a live hub connection. Returns True if the file was written (content changed). """ import datetime as _dt from datetime import timezone as _tz repo = _api_get(api_base, f"/repos/{repo_slug}") if not repo: return False repo_id: str = repo.get("id", "") domain_slug: str = "" # Resolve domain slug: prefer active workstreams, fall back to any workstream # so that a fully-completed repo doesn't degrade to "(unknown)". workstreams = _api_get(api_base, "/workstreams", {"repo_id": repo_id, "status": "active"}) or [] _ws_for_domain = workstreams if (isinstance(workstreams, list) and workstreams) else [] if not _ws_for_domain: all_ws = _api_get(api_base, "/workstreams", {"repo_id": repo_id}) or [] _ws_for_domain = all_ws if isinstance(all_ws, list) else [] if _ws_for_domain: topic = _api_get(api_base, f"/topics/{_ws_for_domain[0].get('topic_id', '')}") if topic: domain_slug = topic.get("domain_slug", "") # Active repo goal (first active one if multiple) goal_text = "" goals = _api_get(api_base, "/repo-goals", {"repo_slug": repo_slug}) or [] if isinstance(goals, list): active_goals = [g for g in goals if g.get("status") == "active"] if active_goals: g = active_goals[0] goal_text = g.get("title", "") or g.get("description", "") now_utc = _dt.datetime.now(_tz.utc) ts = now_utc.strftime("%Y-%m-%d %H:%M UTC") lines = [ _BRIEF_HEADER, f"# Custodian Brief — {repo_slug}", "", f"**Domain:** {domain_slug or '(unknown)'} ", f"**Last synced:** {ts} ", "**State Hub:** http://127.0.0.1:8000 *(adjust if running on a remote machine)*", "", ] if goal_text: lines += ["## Current Goal", "", goal_text, ""] if isinstance(workstreams, list) and workstreams: lines.append("## Active Workstreams") for ws in workstreams: ws_title = ws.get("title", ws.get("slug", "?")) ws_id = ws["id"] tasks = _api_get(api_base, "/tasks", {"workstream_id": ws_id}) or [] if not isinstance(tasks, list): tasks = [] done = sum(1 for t in tasks if t.get("status") in ("done", "cancelled")) total = len(tasks) pct = f"{done}/{total}" if total else "no tasks" open_tasks = [t for t in tasks if t.get("status") in _OPEN_STATUSES] # Show blocked first, then in_progress, then todo (cap at 5) priority_order = {"blocked": 0, "in_progress": 1, "todo": 2} open_tasks.sort(key=lambda t: priority_order.get(t.get("status", "todo"), 9)) lines += [ "", f"### {ws_title}", f"Progress: {pct} done | workstream_id: `{ws_id}`", ] if open_tasks: lines.append("") lines.append("**Open tasks:**") for t in open_tasks[:7]: icon = _TASK_STATUS_ICON.get(t.get("status", "todo"), "·") title = t.get("title", t["id"]) tid = t["id"] status = t.get("status", "") blocker = t.get("blocking_reason", "") task_line = f"- {icon} {title} `{tid[:8]}`" if status == "blocked" and blocker: task_line += f"\n *(blocked: {blocker})*" lines.append(task_line) if len(open_tasks) > 7: lines.append(f"- … and {len(open_tasks) - 7} more open tasks") else: lines += ["## Active Workstreams", "", "*(none — repo may need first-session setup)*"] lines += [ "", "---", "## MCP Orientation (when available)", "", "If the state-hub MCP server is reachable, call:", f"`get_domain_summary(\"{domain_slug}\")`", "This provides richer cross-domain context.", "If the MCP call fails, use this file as your orientation source.", ] content = "\n".join(lines) + "\n" brief_path = Path(repo_path) / ".custodian-brief.md" existing = brief_path.read_text(encoding="utf-8") if brief_path.exists() else "" # Strip the timestamp line before comparing to avoid spurious writes def _strip_ts(text: str) -> str: return "\n".join( ln for ln in text.splitlines() if not ln.startswith("**Last synced:**") ) if _strip_ts(content) == _strip_ts(existing): return False # no meaningful change brief_path.write_text(content, encoding="utf-8") # Commit the brief so remote workers can pull it _git_commit_writeback( repo_path, brief_path, [f"update .custodian-brief.md for {repo_slug}"], ) return True # --------------------------------------------------------------------------- # Fix engine # --------------------------------------------------------------------------- def fix_repo( api_base: str, repo_slug: str, repo_path_override: str | None = None, no_writeback: bool = False, ) -> ConsistencyReport: """Run checks then apply all auto-fixable issues. Returns updated report.""" report = check_repo(api_base, repo_slug, repo_path_override) # Auto-register this machine's path in host_paths so future runs work # without --repo-path. Idempotent: skipped when already correct. repo_path = report.repo_path if repo_path: repo_record = _api_get(api_base, f"/repos/{repo_slug}") if repo_record: hostname = socket.gethostname() if (repo_record.get("host_paths") or {}).get(hostname) != repo_path: result = _api_post( api_base, f"/repos/{repo_slug}/paths", {"host": hostname, "path": repo_path}, ) if result and "_error" not in result: report.fixes_applied.append( f"host_paths[{hostname}] → {repo_path}" ) # T02 — pull gate: warn and skip all write operations when local repo is # behind its remote tracking branch. if repo_path and _detect_behind_remote(repo_path): report.add( severity="WARN", check_id="C-16", message=( f"Repo '{repo_slug}' is behind its remote tracking branch — " f"pull before fixing to avoid clobbering remote progress. " f"Run: git -C {repo_path} pull --ff-only" ), fixable=False, ) report.fixes_applied.append( "C-16: all write operations skipped — local repo is behind remote" ) return report # C-17: backlog guard — if local has unpushed commits from a prior failed push, # try to push them before making more. Skipping writes prevents runaway divergence. if repo_path: ahead = _detect_ahead_of_remote(repo_path) if ahead > 0: push_ok, push_msg = _git_push(repo_path) if not push_ok: report.add( severity="WARN", check_id="C-17", message=( f"Repo '{repo_slug}' has {ahead} unpushed commit(s) and push " f"failed ({push_msg}) — skipping writes to prevent runaway divergence" ), fixable=False, ) report.fixes_applied.append( "C-17: all write operations skipped — unpushed commits, push failed" ) return report # Push succeeded — local is now in sync; proceed normally report.fixes_applied.append(f"C-17 cleared: pushed {ahead} backlogged commit(s)") fixable = [i for i in report.issues if i.fixable] for issue in fixable: ctx = issue._fix_context try: if issue.check_id in ("C-04", "C-05", "C-13"): ws_id = ctx["ws_id"] result = _api_patch(api_base, f"/workstreams/{ws_id}", {ctx["field"]: ctx["value"]}) if result is not None and "_error" not in result: report.fixes_applied.append( f"{issue.check_id} fixed: workstream {ws_id[:8]}… " f"{ctx['field']} → {ctx['value']!r}" ) elif result is not None: report.fixes_applied.append( f"{issue.check_id} FAILED: workstream {ws_id[:8]}… " f"{ctx['field']} → {ctx['value']!r}: {result['_error']}" ) elif issue.check_id == "C-06": wp_file = Path(ctx["wp_file"]) meta = ctx["meta"] domain = ctx["domain"] repo_id_val = ctx["repo_id"] body = ctx.get("body", "") wp_id = str(meta.get("id", "")).strip() title = str(meta.get("title", "")).strip() status = str(meta.get("status", "active")).strip() if status not in ("active", "completed", "archived"): status = "active" # Find topic_id for this domain topics = _api_get(api_base, "/topics") topic_id = None if isinstance(topics, list): for t in topics: if t.get("domain_slug") == domain: topic_id = t["id"] break if topic_id is None: report.fixes_applied.append( f"C-06 SKIP {wp_id}: no topic found for domain '{domain}'" ) continue slug = re.sub(r"[^a-z0-9-]", "-", wp_id.lower()).strip("-") ws_data = _api_post(api_base, "/workstreams", { "topic_id": topic_id, "repo_id": repo_id_val, "slug": slug, "title": title or wp_id, "status": status, "owner": str(meta.get("owner", "")).strip() or None, }) if ws_data is None: report.fixes_applied.append( f"C-06 FAIL {wp_id}: could not create workstream in DB" ) continue new_ws_id = ws_data["id"] _add_frontmatter_field(wp_file, "state_hub_workstream_id", new_ws_id) report.fixes_applied.append( f"C-06 fixed: created workstream {new_ws_id[:8]}… " f"for {wp_id}, wrote ID to {wp_file.name}" ) # Create tasks and inject IDs tasks = get_tasks_from_workplan(meta, body) for task in tasks: if task.get("_parse_error"): continue t_id = str(task.get("id", "")).strip() if not t_id: continue t_status = str(task.get("status", "todo")).strip() if t_status not in VALID_TASK_STATUSES: t_status = "todo" t_priority = str(task.get("priority", "medium")).strip() if t_priority not in VALID_TASK_PRIORITIES: t_priority = "medium" t_data = _api_post(api_base, "/tasks", { "workstream_id": new_ws_id, "title": str(task.get("title", t_id)).strip() or t_id, "status": t_status, "priority": t_priority, "assignee": task.get("assignee") or None, }) if t_data: t_db_id = t_data["id"] injected = _inject_task_id_into_block( wp_file, "state_hub_task_id", t_db_id, t_id ) if not injected: _inject_task_id_frontmatter_list(wp_file, t_db_id, t_id) report.fixes_applied.append(f" + task {t_id} → {t_db_id[:8]}…") elif issue.check_id == "C-09": ws_id = ctx["ws_id"] correct_repo_id = ctx["correct_repo_id"] result = _api_patch(api_base, f"/workstreams/{ws_id}", {"repo_id": correct_repo_id}) if result is not None: report.fixes_applied.append( f"C-09 fixed: workstream {ws_id[:8]}… " f"repo_id → {correct_repo_id[:8]}…" ) elif issue.check_id == "C-10": task_id = ctx["task_id"] status = ctx["status"] result = _api_patch(api_base, f"/tasks/{task_id}", {"status": status}) if result is not None: report.fixes_applied.append( f"C-10 fixed: task {task_id[:8]}… status → {status!r}" ) elif issue.check_id == "C-11": ws_id = ctx["ws_id"] ws_status = ctx.get("ws_status", "") task = ctx["task"] wp_file = Path(ctx["wp_file"]) meta = ctx["meta"] body = ctx.get("body", "") t_id = str(task.get("id", "")).strip() # Skip creating tasks for finished workstreams — the workstream is # done/archived so unlinked tasks are stale file artefacts, not gaps. if ws_status in ("completed", "archived"): report.fixes_applied.append( f"C-11 skipped: task '{t_id}' in {ws_status} workstream — not created" ) else: t_status = str(task.get("status", "todo")).strip() if t_status not in VALID_TASK_STATUSES: t_status = "todo" t_priority = str(task.get("priority", "medium")).strip() if t_priority not in VALID_TASK_PRIORITIES: t_priority = "medium" t_data = _api_post(api_base, "/tasks", { "workstream_id": ws_id, "title": str(task.get("title", t_id)).strip() or t_id, "status": t_status, "priority": t_priority, "assignee": task.get("assignee") or None, }) if t_data: t_db_id = t_data["id"] injected = _inject_task_id_into_block( wp_file, "state_hub_task_id", t_db_id, t_id ) if not injected: _inject_task_id_frontmatter_list(wp_file, t_db_id, t_id) report.fixes_applied.append( f"C-11 fixed: task '{t_id}' → {t_db_id[:8]}…" ) elif issue.check_id == "C-12": task_id = ctx["task_id"] if ctx.get("ws_finished"): result = _api_patch(api_base, f"/tasks/{task_id}", {"status": "cancelled"}) if result is not None: report.fixes_applied.append( f"C-12 fixed: orphan task {task_id[:8]}… cancelled (workstream finished)" ) elif issue.check_id == "C-15": # T03 — writeback: DB is ahead of file — patch file to match DB. if no_writeback: report.fixes_applied.append( f"C-15 skipped (--no-writeback): task '{ctx['task_block_id']}' " f"db={ctx['db_status']!r}" ) else: wp_file = Path(ctx["wp_file"]) task_block_id = ctx["task_block_id"] db_status = ctx["db_status"] old_status = issue.file_value if _patch_task_status_in_file(wp_file, task_block_id, db_status): committed = _git_commit_writeback( repo_path, wp_file, [f"{task_block_id}: {old_status} → {db_status}"], ) suffix = " (committed)" if committed else " (file patched, commit failed)" report.fixes_applied.append( f"C-15 fixed: task '{task_block_id}' " f"{old_status} → {db_status}{suffix}" ) else: report.fixes_applied.append( f"C-15 SKIP: could not locate task block '{task_block_id}' " f"in {wp_file.name}" ) except Exception as e: report.fixes_applied.append(f"{issue.check_id} ERROR: {e}") # Record that a sync run happened for this repo from datetime import timezone as _tz import datetime as _dt now_iso = _dt.datetime.now(_tz.utc).isoformat() _api_patch(api_base, f"/repos/{repo_slug}", {"last_state_synced_at": now_iso}) # Write the worker orientation brief (.custodian-brief.md) if repo_path: brief_written = _write_custodian_brief(api_base, repo_slug, repo_path) if brief_written: report.fixes_applied.append("brief: .custodian-brief.md updated") # Push all commits made this run (writebacks + brief) to close the loop. # This ensures the next run starts with local == remote, making the # timer idempotent and preventing commit pile-up. if repo_path: push_ok, push_msg = _git_push(repo_path) if push_ok: report.fixes_applied.append(f"push: {push_msg}") else: report.fixes_applied.append(f"push WARN: {push_msg}") return report # Check IDs that are known-background noise in multi-machine setups: # C-08 = completed/archived DB workstream with no file (pre-ADR-001 legacy) # These alone do not warrant a pull+fix cycle. _BACKGROUND_CHECKS: frozenset[str] = frozenset({"C-08"}) def _report_needs_action( report: ConsistencyReport, behind_remote: bool, ahead_of_remote: int = 0 ) -> bool: """Return True if the repo warrants a pull+fix cycle. A repo is considered clean (no action needed) when: - It is not behind its remote tracking branch, AND - It has no unpushed local commits (from a prior failed push), AND - It has no FAIL issues, AND - Every WARN/INFO issue is in the background-noise set (C-08). """ if behind_remote or ahead_of_remote > 0: return True if report.failures: return True actionable_warns = [ i for i in report.warnings + report.infos if i.check_id not in _BACKGROUND_CHECKS ] return bool(actionable_warns) def fix_all_remote( api_base: str, no_writeback: bool = False, ) -> list[ConsistencyReport]: """Pull-then-fix all registered repos that need attention. For each repo with a resolvable local path on this machine: 1. Skip if the path does not exist (repo lives on another machine). 2. Run check_repo() and detect_behind_remote() — read-only. 3. If the repo is clean (no actionable issues, not behind remote): skip. 4. Otherwise: git pull --ff-only, then fix_repo(). Returns one ConsistencyReport per repo that was *not* skipped as clean. Repos skipped as clean are printed to stdout but not included in the return value. """ repos = _api_get(api_base, "/repos") if not isinstance(repos, list): print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr) return [] reports: list[ConsistencyReport] = [] skipped_clean: list[str] = [] skipped_missing: list[str] = [] for repo in repos: slug = repo["slug"] # Resolve path using the same priority as check_repo path = resolve_repo_path(repo) if not path or not Path(path).is_dir(): skipped_missing.append(slug) continue # Read-only pass: detect issues and remote staleness. # _detect_behind_remote does a fetch, so _detect_ahead_of_remote # after it sees an up-to-date tracking branch. pre_report = check_repo(api_base, slug) behind = _detect_behind_remote(path) ahead = _detect_ahead_of_remote(path) if not _report_needs_action(pre_report, behind, ahead): skipped_clean.append(slug) continue # Pull before fixing pull_ok, pull_msg = _git_pull(path) pull_tag = f"pull: {pull_msg}" if not pull_ok: pull_tag = f"pull WARN: {pull_msg} — proceeding anyway" report = fix_repo(api_base, slug, no_writeback=no_writeback) report.fixes_applied.insert(0, pull_tag) reports.append(report) # Print clean/missing summary lines before the detailed reports if skipped_clean: print(f" CLEAN (skipped): {', '.join(skipped_clean)}") if skipped_missing: print(f" NOT ON THIS HOST (skipped): {', '.join(skipped_missing)}") if skipped_clean or skipped_missing: print() return reports # --------------------------------------------------------------------------- # Output / rendering # --------------------------------------------------------------------------- def render_text(report: ConsistencyReport, show_info: bool = True) -> str: SEP = "=" * 66 lines = [ f"ADR-001 Consistency Report", f"Repo: {report.repo_slug}", f"Path: {report.repo_path}", SEP, ] for sev in ("FAIL", "WARN", "INFO"): section = [i for i in report.issues if i.severity == sev] if not section or (sev == "INFO" and not show_info): continue lines.append(f"\n {sev}S ({len(section)}):") for i in section: loc = f" [{i.file_path}]" if i.file_path else "" fix_tag = " [fixable]" if i.fixable else "" lines.append(f" {i.check_id}{loc}{fix_tag}") lines.append(f" {i.message}") if i.file_value or i.db_value: lines.append(f" file={i.file_value!r} db={i.db_value!r}") if report.fixes_applied: lines.append(f"\n FIXES APPLIED ({len(report.fixes_applied)}):") for f in report.fixes_applied: lines.append(f" {f}") lines.append(f"\n{SEP}") n_fail = len(report.failures) n_warn = len(report.warnings) n_info = len(report.infos) lines.append(f" {n_fail} fail | {n_warn} warn | {n_info} info") if n_fail: lines.append(" RESULT: ✗ FAIL") elif n_warn: lines.append(" RESULT: ✓ PASS (with warnings)") else: lines.append(" RESULT: ✓ PASS") lines.append(SEP) return "\n".join(lines) def report_to_dict(report: ConsistencyReport) -> dict: return { "repo_slug": report.repo_slug, "repo_path": report.repo_path, "issues": [ { "severity": i.severity, "check_id": i.check_id, "message": i.message, "file_path": i.file_path, "db_id": i.db_id, "file_value": i.file_value, "db_value": i.db_value, "fixable": i.fixable, } for i in report.issues ], "fixes_applied": report.fixes_applied, "summary": { "fail": len(report.failures), "warn": len(report.warnings), "info": len(report.infos), }, "result": ( "fail" if report.failures else "warn" if report.warnings else "pass" ), } # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser( description="ADR-001 consistency checker — bidirectional file↔DB validation", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--repo", metavar="SLUG", help="Registered repo slug (e.g. the-custodian)") group.add_argument("--all", action="store_true", help="Run checks against all repos with a resolvable path on this host") group.add_argument("--here", metavar="PATH", nargs="?", const="", help="Infer repo slug from git remote URL at PATH (default: CWD)") parser.add_argument("--fix", action="store_true", help="Apply auto-fixable issues (status drift, repo mismatch, etc.)") parser.add_argument("--remote", action="store_true", help="Pull each repo before fixing; when used with --all, skips repos " "that are already clean (no actionable issues, not behind remote). " "Implies --fix.") parser.add_argument("--no-writeback", action="store_true", dest="no_writeback", help="Disable DB→file status writeback (C-15) while keeping other fixes") parser.add_argument("--repo-path", metavar="PATH", default=None, help="Override the local repo path (useful when the DB has a different " "machine's path). Takes priority over host_paths and local_path.") parser.add_argument("--api-base", default="http://127.0.0.1:8000", help="State Hub API base URL") parser.add_argument("--json", action="store_true", dest="as_json", help="Output JSON instead of human-readable text") args = parser.parse_args() import os as _os no_wb = getattr(args, "no_writeback", False) do_fix = args.fix or args.remote # --here: infer slug from git remote URL, then run as single-repo check/fix if args.here is not None: search_path = args.here or _os.getcwd() inferred = _infer_slug_from_path(args.api_base, search_path) if inferred is None: print( f"ERROR: No registered repo with a matching remote_url found at '{search_path}'.", file=sys.stderr, ) print( " Register the repo first: POST /repos/ with remote_url set.", file=sys.stderr, ) sys.exit(1) inferred_slug, git_root = inferred print(f" Detected: {inferred_slug} ({git_root})") if do_fix: reports = [fix_repo(args.api_base, inferred_slug, git_root, no_writeback=no_wb)] else: reports = [check_repo(args.api_base, inferred_slug, git_root)] # --remote --all: smart pull+fix across all repos elif args.remote and args.all: reports = fix_all_remote(args.api_base, no_writeback=no_wb) if not reports: sys.exit(0) else: # Resolve repo list hostname = socket.gethostname() repo_slugs: list[str] = [] if args.all: repos = _api_get(args.api_base, "/repos") if not isinstance(repos, list): print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr) sys.exit(1) repo_slugs = [ r["slug"] for r in repos if r.get("local_path") or (r.get("host_paths") or {}).get(hostname) ] if not repo_slugs: print( f"No repos with a path registered for host '{hostname}'.", file=sys.stderr, ) sys.exit(0) else: repo_slugs = [args.repo] # --repo-path only applies to single-repo runs; silently ignored with --all path_override = args.repo_path if not args.all else None if args.remote: # Single-repo remote: pull first, then fix slug = repo_slugs[0] repo = _api_get(args.api_base, f"/repos/{slug}") path = resolve_repo_path(repo or {}, path_override) if repo else (path_override or "") if path: pull_ok, pull_msg = _git_pull(path) prefix = "pull" if pull_ok else "pull WARN" print(f" {prefix}: {pull_msg}") report = fix_repo(args.api_base, slug, path_override, no_writeback=no_wb) reports = [report] elif do_fix: reports = [ fix_repo(args.api_base, slug, path_override, no_writeback=no_wb) for slug in repo_slugs ] else: reports = [check_repo(args.api_base, slug, path_override) for slug in repo_slugs] if args.as_json: output = ( report_to_dict(reports[0]) if len(reports) == 1 else [report_to_dict(r) for r in reports] ) print(json.dumps(output, indent=2)) else: for report in reports: print(render_text(report)) print() any_fail = any(r.failures for r in reports) any_warn = any(r.warnings for r in reports) sys.exit(1 if any_fail else 2 if any_warn else 0) if __name__ == "__main__": main()