#!/usr/bin/env python3 """consistency_check.py — ADR-001 consistency checking engine. Runs bidirectional checks between workplan files in a registered repo and the state-hub DB. The file is always authoritative; the DB is the cache/index layer. Checks: C-01 workplans-dir FAIL No workplans/ directory missing C-02 workplan-parse FAIL No Workplan file cannot be parsed C-03 workstream-stale-ref FAIL No state_hub_workstream_id in file not in DB C-04 workstream-status-drift WARN Yes File status != DB status (file wins) C-05 workstream-title-drift WARN Yes File title != DB title (file wins) C-06 workstream-unlinked WARN Yes Workplan has no state_hub_workstream_id C-07 orphan-db-active FAIL No Active DB workstream, no backing file C-08 orphan-db-completed INFO No Completed/archived DB workstream, no file C-09 workstream-repo-mismatch FAIL Yes DB workstream repo_id != file location C-10 task-status-drift WARN Yes Task status differs between file and DB C-11 task-unlinked WARN Yes Task block has no state_hub_task_id C-12 orphan-db-task WARN No DB task in workstream has no file backing C-13 workstream-auto-complete WARN Yes All DB tasks done but workstream still active C-14 ghost-duplicate WARN No Active topic workstream with no repo_id matches a file-backed title — probable ghost from premature create_workstream() call Usage: python scripts/consistency_check.py --repo SLUG [--fix] [--json] [--api-base URL] python scripts/consistency_check.py --all [--fix] [--json] [--api-base URL] Exit codes: 0 — ok (no FAILs; only WARNs/INFOs) 1 — one or more FAILs present 2 — warn-only (no FAILs, but WARNs present) """ from __future__ import annotations import argparse import json import re import socket import sys from dataclasses import dataclass, field from pathlib import Path from typing import Any try: import yaml as _yaml _HAS_YAML = True except ImportError: _HAS_YAML = False try: import httpx as _httpx _HAS_HTTPX = True except ImportError: _HAS_HTTPX = False # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL) VALID_WP_STATUSES = {"active", "completed", "archived"} VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"} VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"} # Workplan files use task-style vocabulary ("done"); the DB workstream API uses # "completed". This map translates file values to DB values before comparison # and before PATCHing, so "done" vs "completed" is never flagged as C-04 drift. FILE_TO_DB_WORKSTREAM_STATUS: dict[str, str] = { "done": "completed", } def normalise_workstream_status(status: str) -> str: """Translate a workplan file status value to its DB-canonical equivalent.""" return FILE_TO_DB_WORKSTREAM_STATUS.get(status, status) # --------------------------------------------------------------------------- # Data types # --------------------------------------------------------------------------- @dataclass class Issue: severity: str # FAIL | WARN | INFO check_id: str # C-01 through C-12 message: str file_path: str = "" db_id: str = "" file_value: str = "" db_value: str = "" fixable: bool = False _fix_context: dict = field(default_factory=dict, repr=False) @dataclass class ConsistencyReport: repo_slug: str repo_path: str issues: list[Issue] = field(default_factory=list) fixes_applied: list[str] = field(default_factory=list) def add(self, **kwargs) -> Issue: issue = Issue(**kwargs) self.issues.append(issue) return issue @property def failures(self) -> list[Issue]: return [i for i in self.issues if i.severity == "FAIL"] @property def warnings(self) -> list[Issue]: return [i for i in self.issues if i.severity == "WARN"] @property def infos(self) -> list[Issue]: return [i for i in self.issues if i.severity == "INFO"] # --------------------------------------------------------------------------- # YAML / frontmatter parsing # --------------------------------------------------------------------------- def _parse_yaml_block(raw: str) -> dict: """Parse a YAML string; fallback to simple key:value if pyyaml unavailable.""" if _HAS_YAML: try: return _yaml.safe_load(raw) or {} except _yaml.YAMLError: return {"_parse_error": True} result: dict = {} for line in raw.splitlines(): if ":" in line and not line.startswith(" "): k, _, v = line.partition(":") result[k.strip()] = v.strip().strip('"').strip("'") return result def parse_frontmatter(text: str) -> tuple[dict, str]: """Split YAML frontmatter from body. Returns ({}, text) if no frontmatter.""" if not text.startswith("---"): return {}, text parts = text.split("---", 2) if len(parts) < 3: return {}, text meta = _parse_yaml_block(parts[1].strip()) return meta, parts[2] def parse_task_blocks(body: str) -> list[dict]: """Extract all ```task ... ``` YAML blocks from a workplan body.""" return [_parse_yaml_block(m.group(1).strip()) for m in _TASK_BLOCK_RE.finditer(body)] def get_tasks_from_workplan(meta: dict, body: str) -> list[dict]: """Get tasks from workplan — handles both ```task``` blocks and tasks: YAML list.""" blocks = parse_task_blocks(body) if blocks: return blocks # Fallback: tasks embedded as a YAML list in frontmatter (activity-core style) tasks = meta.get("tasks") if isinstance(tasks, list): return tasks return [] # --------------------------------------------------------------------------- # File update helpers # --------------------------------------------------------------------------- def _add_frontmatter_field(file_path: Path, key: str, value: str) -> None: """Insert key: "value" into frontmatter before the closing --- line.""" text = file_path.read_text(encoding="utf-8") lines = text.split("\n") close_idx = None for i, line in enumerate(lines[1:], 1): if line.strip() == "---": close_idx = i break if close_idx is None: return lines.insert(close_idx, f'{key}: "{value}"') file_path.write_text("\n".join(lines), encoding="utf-8") def _inject_task_id_into_block( file_path: Path, field_name: str, field_value: str, match_id: str ) -> bool: """Inject state_hub_task_id into the ```task``` block whose id == match_id.""" text = file_path.read_text(encoding="utf-8") def _replace(m: re.Match) -> str: block_content = m.group(1) task_meta = _parse_yaml_block(block_content.strip()) if str(task_meta.get("id", "")) != match_id: return m.group(0) existing_val = task_meta.get(field_name) if existing_val is not None and str(existing_val).strip() not in ("", "~", "null", "None", "none"): return m.group(0) # Replace existing null/~ line if present, otherwise append new_content = re.sub( rf"^{re.escape(field_name)}:.*$", f'{field_name}: "{field_value}"', block_content, flags=re.MULTILINE, ) if new_content == block_content: new_content = block_content.rstrip() + f"\n{field_name}: \"{field_value}\"" return f"```task\n{new_content}\n```" new_text = _TASK_BLOCK_RE.sub(_replace, text) if new_text != text: file_path.write_text(new_text, encoding="utf-8") return True return False def _inject_task_id_frontmatter_list( file_path: Path, field_value: str, match_id: str ) -> bool: """Inject state_hub_task_id into a task entry in frontmatter tasks: list.""" if not _HAS_YAML: return False import yaml text = file_path.read_text(encoding="utf-8") meta, body = parse_frontmatter(text) tasks = meta.get("tasks", []) changed = False for t in tasks: if str(t.get("id", "")) == match_id and "state_hub_task_id" not in t: t["state_hub_task_id"] = field_value changed = True if not changed: return False try: new_meta_str = yaml.dump(meta, allow_unicode=True, default_flow_style=False) parts = text.split("---", 2) if len(parts) < 3: return False new_text = f"---\n{new_meta_str}---{parts[2]}" file_path.write_text(new_text, encoding="utf-8") return True except Exception: return False # --------------------------------------------------------------------------- # API helpers # --------------------------------------------------------------------------- def _api_get(api_base: str, path: str, params: dict | None = None) -> Any: if not _HAS_HTTPX: return None if not path.endswith("/"): path += "/" try: with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c: r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None}) r.raise_for_status() return r.json() except Exception: return None def _api_patch(api_base: str, path: str, body: dict) -> Any: if not _HAS_HTTPX: return None if not path.endswith("/"): path += "/" try: with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c: r = c.patch(path, json=body) r.raise_for_status() return r.json() except Exception as exc: # Return a sentinel dict so callers can distinguish "API error" from "success" # and report it rather than silently dropping the fix. return {"_error": str(exc)} def _api_post(api_base: str, path: str, body: dict) -> Any: if not _HAS_HTTPX: return None if not path.endswith("/"): path += "/" try: with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c: r = c.post(path, json=body) r.raise_for_status() return r.json() except Exception: return None # --------------------------------------------------------------------------- # Core check engine # --------------------------------------------------------------------------- def resolve_repo_path(repo: dict, override: str | None = None) -> str: """Resolve the local filesystem path for a repo on the current machine. Priority: 1. Explicit --repo-path CLI override 2. host_paths[current_hostname] — per-machine path registered via POST /repos/{slug}/paths/ 3. local_path — legacy single-path field (backward compat) """ if override: return override hostname = socket.gethostname() host_paths = repo.get("host_paths") or {} return host_paths.get(hostname) or repo.get("local_path") or "" def check_repo(api_base: str, repo_slug: str, repo_path_override: str | None = None) -> ConsistencyReport: """Run all consistency checks for a registered repo.""" repo = _api_get(api_base, f"/repos/{repo_slug}") if repo is None: report = ConsistencyReport(repo_slug=repo_slug, repo_path="(unknown)") report.add( severity="FAIL", check_id="C-00", message=f"Repo '{repo_slug}' not found in state-hub DB", fixable=False, ) return report repo_id: str = repo["id"] repo_path: str = resolve_repo_path(repo, repo_path_override) report = ConsistencyReport(repo_slug=repo_slug, repo_path=repo_path) if not repo_path: report.add( severity="FAIL", check_id="C-00", message=f"Repo '{repo_slug}' has no local_path — cannot check workplan files", fixable=False, ) _check_orphan_db(api_base, repo_id, set(), report) return report repo_dir = Path(repo_path) workplans_dir = repo_dir / "workplans" # C-01: workplans/ directory missing if not workplans_dir.is_dir(): report.add( severity="FAIL", check_id="C-01", message=( "workplans/ directory missing — ADR-001 requires workplan files " "at /workplans/-.md" ), fixable=False, ) _check_orphan_db(api_base, repo_id, set(), report) return report # Parse workplan files workplan_infos: list[tuple[Path, dict, str]] = [] file_ws_ids: dict[str, tuple[Path, dict, str]] = {} # ws_id → (file, meta, body) for wp_file in sorted(workplans_dir.glob("*.md")): try: text = wp_file.read_text(encoding="utf-8") except OSError as e: report.add(severity="FAIL", check_id="C-02", message=f"Cannot read file: {e}", file_path=wp_file.name) continue if not text.startswith("---"): report.add(severity="FAIL", check_id="C-02", message="No YAML frontmatter found", file_path=wp_file.name) continue meta, body = parse_frontmatter(text) if not meta or meta.get("_parse_error"): report.add(severity="FAIL", check_id="C-02", message="YAML frontmatter parse error", file_path=wp_file.name) continue workplan_infos.append((wp_file, meta, body)) ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"') if ws_id: file_ws_ids[ws_id] = (wp_file, meta, body) # Per-workplan checks for wp_file, meta, body in workplan_infos: fname = wp_file.name ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"') file_status = str(meta.get("status", "")).strip() file_title = str(meta.get("title", "")).strip() file_domain = str(meta.get("domain", "")).strip() if not ws_id: # C-06: workplan not linked to any DB workstream report.add( severity="WARN", check_id="C-06", message="Workplan has no state_hub_workstream_id — not indexed in DB", file_path=fname, file_value=file_title or fname, fixable=True, _fix_context={ "wp_file": str(wp_file), "meta": meta, "body": body, "repo_id": repo_id, "domain": file_domain, }, ) continue ws = _api_get(api_base, f"/workstreams/{ws_id}") if ws is None: # C-03: stale workstream reference report.add( severity="FAIL", check_id="C-03", message=f"state_hub_workstream_id {ws_id[:8]}… not found in DB (stale reference)", file_path=fname, db_id=ws_id, fixable=False, ) continue # C-09: repo mismatch — file is here but DB says different repo db_repo_id: str = ws.get("repo_id") or "" if db_repo_id != repo_id: report.add( severity="FAIL", check_id="C-09", message=( f"Workstream '{ws.get('slug')}' repo_id in DB is " f"{db_repo_id[:8] if db_repo_id else 'None'} " f"but backing file lives in {repo_slug} ({repo_id[:8]}…)" ), file_path=fname, db_id=ws_id, file_value=repo_id, db_value=db_repo_id, fixable=True, _fix_context={"ws_id": ws_id, "correct_repo_id": repo_id}, ) # Continue to check drift even with mismatched repo # C-04: status drift — normalise file value before comparing so that # "done" (file) vs "completed" (DB) is not treated as drift. db_status = ws.get("status", "") normalised_file_status = normalise_workstream_status(file_status) if file_status and db_status and normalised_file_status != db_status: report.add( severity="WARN", check_id="C-04", message=( f"Status drift in '{ws.get('slug')}': " f"file={file_status!r} db={db_status!r} (file wins)" ), file_path=fname, db_id=ws_id, file_value=file_status, db_value=db_status, fixable=True, _fix_context={ "ws_id": ws_id, "field": "status", "value": normalised_file_status, # always send DB-valid value }, ) # C-05: title drift db_title = ws.get("title", "") if file_title and db_title and file_title != db_title: report.add( severity="WARN", check_id="C-05", message=( f"Title drift in '{ws.get('slug')}': " f"file={file_title!r} db={db_title!r} (file wins)" ), file_path=fname, db_id=ws_id, file_value=file_title, db_value=db_title, fixable=True, _fix_context={"ws_id": ws_id, "field": "title", "value": file_title}, ) # C-10, C-11, C-12: task-level checks tasks = get_tasks_from_workplan(meta, body) db_tasks = _api_get(api_base, "/tasks", {"workstream_id": ws_id}) db_task_by_id: dict[str, dict] = {} if isinstance(db_tasks, list): for t in db_tasks: db_task_by_id[t["id"]] = t file_task_sh_ids: set[str] = set() for task in tasks: if task.get("_parse_error"): continue t_id = str(task.get("id", "")).strip() _raw_sh = task.get("state_hub_task_id") t_sh_id = "" if _raw_sh is None else str(_raw_sh).strip().strip('"') if t_sh_id in ("~", "null", "None", "none"): t_sh_id = "" t_status = str(task.get("status", "")).strip() if t_sh_id: file_task_sh_ids.add(t_sh_id) db_task = _api_get(api_base, f"/tasks/{t_sh_id}") if db_task is None: report.add( severity="FAIL", check_id="C-03", message=f"state_hub_task_id {t_sh_id[:8]}… not found in DB", file_path=f"{fname}#{t_id}", db_id=t_sh_id, fixable=False, ) continue # C-10: task status drift db_t_status = db_task.get("status", "") if t_status and db_t_status and t_status != db_t_status: report.add( severity="WARN", check_id="C-10", message=( f"Task status drift '{t_id}': " f"file={t_status!r} db={db_t_status!r} (file wins)" ), file_path=f"{fname}#{t_id}", db_id=t_sh_id, file_value=t_status, db_value=db_t_status, fixable=True, _fix_context={"task_id": t_sh_id, "status": t_status}, ) elif t_id: # C-11: task exists in file but not linked to DB report.add( severity="WARN", check_id="C-11", message=f"Task '{t_id}' has no state_hub_task_id", file_path=f"{fname}#{t_id}", fixable=True, _fix_context={ "ws_id": ws_id, "task": task, "wp_file": str(wp_file), "meta": meta, "body": body, }, ) # C-12: DB tasks with no file backing if isinstance(db_tasks, list): for db_t in db_tasks: if db_t["id"] not in file_task_sh_ids: report.add( severity="WARN", check_id="C-12", message=( f"DB task '{db_t.get('title', '')}' " f"(id={db_t['id'][:8]}…, status={db_t.get('status', '')}) " f"in workstream '{ws.get('slug')}' has no file backing" ), db_id=db_t["id"], fixable=False, ) # C-13: all DB tasks done but workstream still active — worker forgot to close db_status = ws.get("status", "") if db_status == "active" and isinstance(db_tasks, list) and db_tasks: non_terminal = [ t for t in db_tasks if t.get("status") not in ("done", "cancelled") ] if not non_terminal: report.add( severity="WARN", check_id="C-13", message=( f"All {len(db_tasks)} DB tasks for '{ws.get('slug')}' are " f"done/cancelled but workstream status is still 'active' — " f"worker likely forgot update_workstream_status()" ), file_path=fname, db_id=ws_id, db_value="active", fixable=True, _fix_context={ "ws_id": ws_id, "field": "status", "value": "completed", }, ) # C-07 / C-08: orphan DB workstreams (have repo_id=this_repo but no backing file) _check_orphan_db(api_base, repo_id, set(file_ws_ids.keys()), report) # C-14: ghost duplicate — active workstream on same topic with no repo_id whose # title matches a file-backed workstream. Root cause: create_workstream() called # before the workplan file was written; fix-consistency then created a second # workstream from the file, leaving the first as an invisible orphan. _check_ghost_duplicates(api_base, workplan_infos, file_ws_ids, report) return report def _check_orphan_db( api_base: str, repo_id: str, file_ws_ids: set[str], report: ConsistencyReport ) -> None: """Flag DB workstreams with repo_id=this_repo that have no backing workplan file.""" all_ws = _api_get(api_base, "/workstreams", {"repo_id": repo_id}) if not isinstance(all_ws, list): return for ws in all_ws: ws_id = ws["id"] if ws_id in file_ws_ids: continue ws_status = ws.get("status", "") ws_slug = ws.get("slug", "") if ws_status == "active": report.add( severity="FAIL", check_id="C-07", message=( f"Active DB workstream '{ws_slug}' (id={ws_id[:8]}…) " f"has no backing workplan file — ADR-001 violation" ), db_id=ws_id, fixable=False, ) elif ws_status in ("completed", "archived"): report.add( severity="INFO", check_id="C-08", message=( f"Completed/archived DB workstream '{ws_slug}' " f"(id={ws_id[:8]}…, status={ws_status}) has no backing workplan file" ), db_id=ws_id, fixable=False, ) def _check_ghost_duplicates( api_base: str, workplan_infos: list[tuple], file_ws_ids: dict[str, tuple], report: ConsistencyReport, ) -> None: """C-14: detect active workstreams with repo_id=null whose title matches a file-backed workstream — these are ghosts created by premature create_workstream() calls before the workplan file existed. """ # Build lookup: normalised title → file-backed workstream id file_titles: dict[str, str] = {} for _, meta, _ in workplan_infos: ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"') title = str(meta.get("title", "")).strip().lower() if title and ws_id: file_titles[title] = ws_id if not file_titles: return # Gather topic_ids from all file-backed workstreams so we can query by topic topic_ids: set[str] = set() for ws_id in file_ws_ids: ws = _api_get(api_base, f"/workstreams/{ws_id}") if ws and ws.get("topic_id"): topic_ids.add(ws["topic_id"]) for topic_id in topic_ids: topic_ws = _api_get(api_base, "/workstreams", {"topic_id": topic_id, "status": "active"}) if not isinstance(topic_ws, list): continue for ws in topic_ws: ws_id = ws["id"] if ws_id in file_ws_ids: continue # legitimately linked if ws.get("repo_id"): continue # C-07 covers repo-scoped orphans ws_title = ws.get("title", "").strip().lower() if ws_title in file_titles: file_backed_id = file_titles[ws_title] report.add( severity="WARN", check_id="C-14", message=( f"Ghost duplicate: active workstream '{ws.get('slug')}' " f"(id={ws_id[:8]}…, repo_id=null) has same title as " f"file-backed workstream {file_backed_id[:8]}… — " f"likely created via create_workstream() before workplan file existed; " f"archive it" ), db_id=ws_id, fixable=False, ) # --------------------------------------------------------------------------- # Fix engine # --------------------------------------------------------------------------- def fix_repo(api_base: str, repo_slug: str, repo_path_override: str | None = None) -> ConsistencyReport: """Run checks then apply all auto-fixable issues. Returns updated report.""" report = check_repo(api_base, repo_slug, repo_path_override) fixable = [i for i in report.issues if i.fixable] for issue in fixable: ctx = issue._fix_context try: if issue.check_id in ("C-04", "C-05", "C-13"): ws_id = ctx["ws_id"] result = _api_patch(api_base, f"/workstreams/{ws_id}", {ctx["field"]: ctx["value"]}) if result is not None and "_error" not in result: report.fixes_applied.append( f"{issue.check_id} fixed: workstream {ws_id[:8]}… " f"{ctx['field']} → {ctx['value']!r}" ) elif result is not None: report.fixes_applied.append( f"{issue.check_id} FAILED: workstream {ws_id[:8]}… " f"{ctx['field']} → {ctx['value']!r}: {result['_error']}" ) elif issue.check_id == "C-06": wp_file = Path(ctx["wp_file"]) meta = ctx["meta"] domain = ctx["domain"] repo_id_val = ctx["repo_id"] body = ctx.get("body", "") wp_id = str(meta.get("id", "")).strip() title = str(meta.get("title", "")).strip() status = str(meta.get("status", "active")).strip() if status not in ("active", "completed", "archived"): status = "active" # Find topic_id for this domain topics = _api_get(api_base, "/topics") topic_id = None if isinstance(topics, list): for t in topics: if t.get("domain_slug") == domain: topic_id = t["id"] break if topic_id is None: report.fixes_applied.append( f"C-06 SKIP {wp_id}: no topic found for domain '{domain}'" ) continue slug = re.sub(r"[^a-z0-9-]", "-", wp_id.lower()).strip("-") ws_data = _api_post(api_base, "/workstreams", { "topic_id": topic_id, "repo_id": repo_id_val, "slug": slug, "title": title or wp_id, "status": status, "owner": str(meta.get("owner", "")).strip() or None, }) if ws_data is None: report.fixes_applied.append( f"C-06 FAIL {wp_id}: could not create workstream in DB" ) continue new_ws_id = ws_data["id"] _add_frontmatter_field(wp_file, "state_hub_workstream_id", new_ws_id) report.fixes_applied.append( f"C-06 fixed: created workstream {new_ws_id[:8]}… " f"for {wp_id}, wrote ID to {wp_file.name}" ) # Create tasks and inject IDs tasks = get_tasks_from_workplan(meta, body) for task in tasks: if task.get("_parse_error"): continue t_id = str(task.get("id", "")).strip() if not t_id: continue t_status = str(task.get("status", "todo")).strip() if t_status not in VALID_TASK_STATUSES: t_status = "todo" t_priority = str(task.get("priority", "medium")).strip() if t_priority not in VALID_TASK_PRIORITIES: t_priority = "medium" t_data = _api_post(api_base, "/tasks", { "workstream_id": new_ws_id, "title": str(task.get("title", t_id)).strip() or t_id, "status": t_status, "priority": t_priority, "assignee": task.get("assignee") or None, }) if t_data: t_db_id = t_data["id"] injected = _inject_task_id_into_block( wp_file, "state_hub_task_id", t_db_id, t_id ) if not injected: _inject_task_id_frontmatter_list(wp_file, t_db_id, t_id) report.fixes_applied.append(f" + task {t_id} → {t_db_id[:8]}…") elif issue.check_id == "C-09": ws_id = ctx["ws_id"] correct_repo_id = ctx["correct_repo_id"] result = _api_patch(api_base, f"/workstreams/{ws_id}", {"repo_id": correct_repo_id}) if result is not None: report.fixes_applied.append( f"C-09 fixed: workstream {ws_id[:8]}… " f"repo_id → {correct_repo_id[:8]}…" ) elif issue.check_id == "C-10": task_id = ctx["task_id"] status = ctx["status"] result = _api_patch(api_base, f"/tasks/{task_id}", {"status": status}) if result is not None: report.fixes_applied.append( f"C-10 fixed: task {task_id[:8]}… status → {status!r}" ) elif issue.check_id == "C-11": ws_id = ctx["ws_id"] task = ctx["task"] wp_file = Path(ctx["wp_file"]) meta = ctx["meta"] body = ctx.get("body", "") t_id = str(task.get("id", "")).strip() t_status = str(task.get("status", "todo")).strip() if t_status not in VALID_TASK_STATUSES: t_status = "todo" t_priority = str(task.get("priority", "medium")).strip() if t_priority not in VALID_TASK_PRIORITIES: t_priority = "medium" t_data = _api_post(api_base, "/tasks", { "workstream_id": ws_id, "title": str(task.get("title", t_id)).strip() or t_id, "status": t_status, "priority": t_priority, "assignee": task.get("assignee") or None, }) if t_data: t_db_id = t_data["id"] injected = _inject_task_id_into_block( wp_file, "state_hub_task_id", t_db_id, t_id ) if not injected: _inject_task_id_frontmatter_list(wp_file, t_db_id, t_id) report.fixes_applied.append( f"C-11 fixed: task '{t_id}' → {t_db_id[:8]}…" ) except Exception as e: report.fixes_applied.append(f"{issue.check_id} ERROR: {e}") # Record that a sync run happened for this repo from datetime import timezone as _tz import datetime as _dt now_iso = _dt.datetime.now(_tz.utc).isoformat() _api_patch(api_base, f"/repos/{repo_slug}/", {"last_state_synced_at": now_iso}) return report # --------------------------------------------------------------------------- # Output / rendering # --------------------------------------------------------------------------- def render_text(report: ConsistencyReport, show_info: bool = True) -> str: SEP = "=" * 66 lines = [ f"ADR-001 Consistency Report", f"Repo: {report.repo_slug}", f"Path: {report.repo_path}", SEP, ] for sev in ("FAIL", "WARN", "INFO"): section = [i for i in report.issues if i.severity == sev] if not section or (sev == "INFO" and not show_info): continue lines.append(f"\n {sev}S ({len(section)}):") for i in section: loc = f" [{i.file_path}]" if i.file_path else "" fix_tag = " [fixable]" if i.fixable else "" lines.append(f" {i.check_id}{loc}{fix_tag}") lines.append(f" {i.message}") if i.file_value or i.db_value: lines.append(f" file={i.file_value!r} db={i.db_value!r}") if report.fixes_applied: lines.append(f"\n FIXES APPLIED ({len(report.fixes_applied)}):") for f in report.fixes_applied: lines.append(f" {f}") lines.append(f"\n{SEP}") n_fail = len(report.failures) n_warn = len(report.warnings) n_info = len(report.infos) lines.append(f" {n_fail} fail | {n_warn} warn | {n_info} info") if n_fail: lines.append(" RESULT: ✗ FAIL") elif n_warn: lines.append(" RESULT: ✓ PASS (with warnings)") else: lines.append(" RESULT: ✓ PASS") lines.append(SEP) return "\n".join(lines) def report_to_dict(report: ConsistencyReport) -> dict: return { "repo_slug": report.repo_slug, "repo_path": report.repo_path, "issues": [ { "severity": i.severity, "check_id": i.check_id, "message": i.message, "file_path": i.file_path, "db_id": i.db_id, "file_value": i.file_value, "db_value": i.db_value, "fixable": i.fixable, } for i in report.issues ], "fixes_applied": report.fixes_applied, "summary": { "fail": len(report.failures), "warn": len(report.warnings), "info": len(report.infos), }, "result": ( "fail" if report.failures else "warn" if report.warnings else "pass" ), } # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser( description="ADR-001 consistency checker — bidirectional file↔DB validation", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--repo", metavar="SLUG", help="Registered repo slug (e.g. the-custodian)") group.add_argument("--all", action="store_true", help="Run checks against all registered repos with local_path") parser.add_argument("--fix", action="store_true", help="Apply auto-fixable issues (status drift, repo mismatch, etc.)") parser.add_argument("--repo-path", metavar="PATH", default=None, help="Override the local repo path (useful when the DB has a different " "machine's path). Takes priority over host_paths and local_path.") parser.add_argument("--api-base", default="http://127.0.0.1:8000", help="State Hub API base URL") parser.add_argument("--json", action="store_true", dest="as_json", help="Output JSON instead of human-readable text") args = parser.parse_args() # Resolve repo list repo_slugs: list[str] = [] if args.all: repos = _api_get(args.api_base, "/repos") if not isinstance(repos, list): print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr) sys.exit(1) repo_slugs = [r["slug"] for r in repos if r.get("local_path")] if not repo_slugs: print("No repos with local_path registered.", file=sys.stderr) sys.exit(0) else: repo_slugs = [args.repo] runner = fix_repo if args.fix else check_repo # --repo-path only applies to single-repo runs; silently ignored with --all path_override = args.repo_path if not args.all else None reports = [runner(args.api_base, slug, path_override) for slug in repo_slugs] if args.as_json: output = ( report_to_dict(reports[0]) if len(reports) == 1 else [report_to_dict(r) for r in reports] ) print(json.dumps(output, indent=2)) else: for report in reports: print(render_text(report)) print() any_fail = any(r.failures for r in reports) any_warn = any(r.warnings for r in reports) sys.exit(1 if any_fail else 2 if any_warn else 0) if __name__ == "__main__": main()