Files
state-hub/scripts/consistency_check.py
tegwick 075b34945f feat(consistency): fix-consistency-remote works without REPO for all repos
Adds --remote CLI flag and fix_all_remote() function. When run without a
REPO argument, the target checks all registered repos and:
- Skips repos whose local path does not exist on this machine
- Skips repos that are already clean (no fixable issues, no FAILs, not
  behind remote, only C-08 background noise allowed)
- For repos that need work: git pull --ff-only then fix_repo()

Prints a summary of CLEAN (skipped) and NOT ON THIS HOST (skipped) repos
before the detailed fix reports.

Simplifies the Makefile target from shell-level curl+git to a single
uv run call using --remote. Same flag handles both single-repo and all-repos.

Also adds _git_pull() helper and 13 new tests (71 total in consistency suite).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-26 14:38:30 +01:00

1359 lines
53 KiB
Python

#!/usr/bin/env python3
"""consistency_check.py — ADR-001 consistency checking engine.
Runs bidirectional checks between workplan files in a registered repo and the
state-hub DB. The file is always authoritative; the DB is the cache/index layer.
Checks:
C-01 workplans-dir FAIL No workplans/ directory missing
C-02 workplan-parse FAIL No Workplan file cannot be parsed
C-03 workstream-stale-ref FAIL No state_hub_workstream_id in file not in DB
C-04 workstream-status-drift WARN Yes File status != DB status (file wins)
C-05 workstream-title-drift WARN Yes File title != DB title (file wins)
C-06 workstream-unlinked WARN Yes Workplan has no state_hub_workstream_id
C-07 orphan-db-active FAIL No Active DB workstream, no backing file
C-08 orphan-db-completed INFO No Completed/archived DB workstream, no file
C-09 workstream-repo-mismatch FAIL Yes DB workstream repo_id != file location
C-10 task-status-drift WARN Yes Task status differs between file and DB
C-11 task-unlinked WARN Yes Task block has no state_hub_task_id
C-12 orphan-db-task WARN No DB task in workstream has no file backing
C-13 workstream-auto-complete WARN Yes All DB tasks done but workstream still active
C-14 ghost-duplicate WARN No Active topic workstream with no repo_id matches a file-backed title — probable ghost from premature create_workstream() call
C-15 task-db-ahead WARN Yes DB task status is ahead of file — regression prevented; writeback syncs file
C-16 repo-behind-remote WARN No Local repo is behind remote tracking branch — --fix skipped to avoid clobbering remote progress
Usage:
python scripts/consistency_check.py --repo SLUG [--fix] [--no-writeback] [--json] [--api-base URL]
python scripts/consistency_check.py --all [--fix] [--no-writeback] [--json] [--api-base URL]
Exit codes:
0 — ok (no FAILs; only WARNs/INFOs)
1 — one or more FAILs present
2 — warn-only (no FAILs, but WARNs present)
"""
from __future__ import annotations
import argparse
import json
import re
import socket
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
try:
import yaml as _yaml
_HAS_YAML = True
except ImportError:
_HAS_YAML = False
try:
import httpx as _httpx
_HAS_HTTPX = True
except ImportError:
_HAS_HTTPX = False
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL)
_HEADING_RE = re.compile(r"^#{1,4}\s+(.+?)$", re.MULTILINE)
VALID_WP_STATUSES = {"active", "completed", "archived"}
VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"}
VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"}
# Workplan files use task-style vocabulary ("done"); the DB workstream API uses
# "completed". This map translates file values to DB values before comparison
# and before PATCHing, so "done" vs "completed" is never flagged as C-04 drift.
FILE_TO_DB_WORKSTREAM_STATUS: dict[str, str] = {
"done": "completed",
}
# Ordinal ranking for task statuses used by the no-regress rule (T01/C-15).
# blocked and in_progress share rank 1 — both are "in flight".
STATUS_ORDER: dict[str, int] = {
"todo": 0,
"in_progress": 1,
"blocked": 1,
"done": 2,
"cancelled": 2,
}
def normalise_workstream_status(status: str) -> str:
"""Translate a workplan file status value to its DB-canonical equivalent."""
return FILE_TO_DB_WORKSTREAM_STATUS.get(status, status)
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
@dataclass
class Issue:
severity: str # FAIL | WARN | INFO
check_id: str # C-01 through C-12
message: str
file_path: str = ""
db_id: str = ""
file_value: str = ""
db_value: str = ""
fixable: bool = False
_fix_context: dict = field(default_factory=dict, repr=False)
@dataclass
class ConsistencyReport:
repo_slug: str
repo_path: str
issues: list[Issue] = field(default_factory=list)
fixes_applied: list[str] = field(default_factory=list)
def add(self, **kwargs) -> Issue:
issue = Issue(**kwargs)
self.issues.append(issue)
return issue
@property
def failures(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "FAIL"]
@property
def warnings(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "WARN"]
@property
def infos(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "INFO"]
# ---------------------------------------------------------------------------
# YAML / frontmatter parsing
# ---------------------------------------------------------------------------
def _parse_yaml_block(raw: str) -> dict:
"""Parse a YAML string; fallback to simple key:value if pyyaml unavailable."""
if _HAS_YAML:
try:
return _yaml.safe_load(raw) or {}
except _yaml.YAMLError:
return {"_parse_error": True}
result: dict = {}
for line in raw.splitlines():
if ":" in line and not line.startswith(" "):
k, _, v = line.partition(":")
result[k.strip()] = v.strip().strip('"').strip("'")
return result
def parse_frontmatter(text: str) -> tuple[dict, str]:
"""Split YAML frontmatter from body. Returns ({}, text) if no frontmatter."""
if not text.startswith("---"):
return {}, text
parts = text.split("---", 2)
if len(parts) < 3:
return {}, text
meta = _parse_yaml_block(parts[1].strip())
return meta, parts[2]
def parse_task_blocks(body: str) -> list[dict]:
"""Extract task blocks, injecting title from the nearest preceding ### heading."""
headings = [(m.start(), m.group(1).strip()) for m in _HEADING_RE.finditer(body)]
results = []
for m in _TASK_BLOCK_RE.finditer(body):
task = _parse_yaml_block(m.group(1).strip())
if "title" not in task:
prev = [(pos, text) for pos, text in headings if pos < m.start()]
if prev:
task["title"] = prev[-1][1]
results.append(task)
return results
def get_tasks_from_workplan(meta: dict, body: str) -> list[dict]:
"""Get tasks from workplan — handles both ```task``` blocks and tasks: YAML list."""
blocks = parse_task_blocks(body)
if blocks:
return blocks
# Fallback: tasks embedded as a YAML list in frontmatter (activity-core style)
tasks = meta.get("tasks")
if isinstance(tasks, list):
return tasks
return []
# ---------------------------------------------------------------------------
# File update helpers
# ---------------------------------------------------------------------------
def _add_frontmatter_field(file_path: Path, key: str, value: str) -> None:
"""Insert key: "value" into frontmatter before the closing --- line."""
text = file_path.read_text(encoding="utf-8")
lines = text.split("\n")
close_idx = None
for i, line in enumerate(lines[1:], 1):
if line.strip() == "---":
close_idx = i
break
if close_idx is None:
return
lines.insert(close_idx, f'{key}: "{value}"')
file_path.write_text("\n".join(lines), encoding="utf-8")
def _inject_task_id_into_block(
file_path: Path, field_name: str, field_value: str, match_id: str
) -> bool:
"""Inject state_hub_task_id into the ```task``` block whose id == match_id."""
text = file_path.read_text(encoding="utf-8")
def _replace(m: re.Match) -> str:
block_content = m.group(1)
task_meta = _parse_yaml_block(block_content.strip())
if str(task_meta.get("id", "")) != match_id:
return m.group(0)
existing_val = task_meta.get(field_name)
if existing_val is not None and str(existing_val).strip() not in ("", "~", "null", "None", "none"):
return m.group(0)
# Replace existing null/~ line if present, otherwise append
new_content = re.sub(
rf"^{re.escape(field_name)}:.*$",
f'{field_name}: "{field_value}"',
block_content,
flags=re.MULTILINE,
)
if new_content == block_content:
new_content = block_content.rstrip() + f"\n{field_name}: \"{field_value}\""
return f"```task\n{new_content}\n```"
new_text = _TASK_BLOCK_RE.sub(_replace, text)
if new_text != text:
file_path.write_text(new_text, encoding="utf-8")
return True
return False
def _inject_task_id_frontmatter_list(
file_path: Path, field_value: str, match_id: str
) -> bool:
"""Inject state_hub_task_id into a task entry in frontmatter tasks: list."""
if not _HAS_YAML:
return False
import yaml
text = file_path.read_text(encoding="utf-8")
meta, body = parse_frontmatter(text)
tasks = meta.get("tasks", [])
changed = False
for t in tasks:
if str(t.get("id", "")) == match_id and "state_hub_task_id" not in t:
t["state_hub_task_id"] = field_value
changed = True
if not changed:
return False
try:
new_meta_str = yaml.dump(meta, allow_unicode=True, default_flow_style=False)
parts = text.split("---", 2)
if len(parts) < 3:
return False
new_text = f"---\n{new_meta_str}---{parts[2]}"
file_path.write_text(new_text, encoding="utf-8")
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def _api_get(api_base: str, path: str, params: dict | None = None) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None})
r.raise_for_status()
return r.json()
except Exception:
return None
def _api_patch(api_base: str, path: str, body: dict) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.patch(path, json=body)
r.raise_for_status()
return r.json()
except Exception as exc:
# Return a sentinel dict so callers can distinguish "API error" from "success"
# and report it rather than silently dropping the fix.
return {"_error": str(exc)}
def _api_post(api_base: str, path: str, body: dict) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.post(path, json=body)
r.raise_for_status()
return r.json()
except Exception:
return None
# ---------------------------------------------------------------------------
# Core check engine
# ---------------------------------------------------------------------------
def resolve_repo_path(repo: dict, override: str | None = None) -> str:
"""Resolve the local filesystem path for a repo on the current machine.
Priority:
1. Explicit --repo-path CLI override
2. host_paths[current_hostname] — per-machine path registered via POST /repos/{slug}/paths/
3. local_path — legacy single-path field (backward compat)
"""
if override:
return override
hostname = socket.gethostname()
host_paths = repo.get("host_paths") or {}
return host_paths.get(hostname) or repo.get("local_path") or ""
def check_repo(api_base: str, repo_slug: str, repo_path_override: str | None = None) -> ConsistencyReport:
"""Run all consistency checks for a registered repo."""
repo = _api_get(api_base, f"/repos/{repo_slug}")
if repo is None:
report = ConsistencyReport(repo_slug=repo_slug, repo_path="(unknown)")
report.add(
severity="FAIL", check_id="C-00",
message=f"Repo '{repo_slug}' not found in state-hub DB",
fixable=False,
)
return report
repo_id: str = repo["id"]
repo_path: str = resolve_repo_path(repo, repo_path_override)
report = ConsistencyReport(repo_slug=repo_slug, repo_path=repo_path)
if not repo_path:
report.add(
severity="FAIL", check_id="C-00",
message=f"Repo '{repo_slug}' has no local_path — cannot check workplan files",
fixable=False,
)
_check_orphan_db(api_base, repo_id, set(), report)
return report
repo_dir = Path(repo_path)
workplans_dir = repo_dir / "workplans"
# C-01: workplans/ directory missing
if not workplans_dir.is_dir():
report.add(
severity="FAIL", check_id="C-01",
message=(
"workplans/ directory missing — ADR-001 requires workplan files "
"at <repo>/workplans/<ID>-<slug>.md"
),
fixable=False,
)
_check_orphan_db(api_base, repo_id, set(), report)
return report
# Parse workplan files
workplan_infos: list[tuple[Path, dict, str]] = []
file_ws_ids: dict[str, tuple[Path, dict, str]] = {} # ws_id → (file, meta, body)
for wp_file in sorted(workplans_dir.glob("*.md")):
try:
text = wp_file.read_text(encoding="utf-8")
except OSError as e:
report.add(severity="FAIL", check_id="C-02",
message=f"Cannot read file: {e}", file_path=wp_file.name)
continue
if not text.startswith("---"):
report.add(severity="FAIL", check_id="C-02",
message="No YAML frontmatter found", file_path=wp_file.name)
continue
meta, body = parse_frontmatter(text)
if not meta or meta.get("_parse_error"):
report.add(severity="FAIL", check_id="C-02",
message="YAML frontmatter parse error", file_path=wp_file.name)
continue
workplan_infos.append((wp_file, meta, body))
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
if ws_id:
file_ws_ids[ws_id] = (wp_file, meta, body)
# Per-workplan checks
for wp_file, meta, body in workplan_infos:
fname = wp_file.name
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
file_status = str(meta.get("status", "")).strip()
file_title = str(meta.get("title", "")).strip()
file_domain = str(meta.get("domain", "")).strip()
if not ws_id:
# C-06: workplan not linked to any DB workstream
report.add(
severity="WARN", check_id="C-06",
message="Workplan has no state_hub_workstream_id — not indexed in DB",
file_path=fname,
file_value=file_title or fname,
fixable=True,
_fix_context={
"wp_file": str(wp_file),
"meta": meta,
"body": body,
"repo_id": repo_id,
"domain": file_domain,
},
)
continue
ws = _api_get(api_base, f"/workstreams/{ws_id}")
if ws is None:
# C-03: stale workstream reference
report.add(
severity="FAIL", check_id="C-03",
message=f"state_hub_workstream_id {ws_id[:8]}… not found in DB (stale reference)",
file_path=fname,
db_id=ws_id,
fixable=False,
)
continue
# C-09: repo mismatch — file is here but DB says different repo
db_repo_id: str = ws.get("repo_id") or ""
if db_repo_id != repo_id:
report.add(
severity="FAIL", check_id="C-09",
message=(
f"Workstream '{ws.get('slug')}' repo_id in DB is "
f"{db_repo_id[:8] if db_repo_id else 'None'} "
f"but backing file lives in {repo_slug} ({repo_id[:8]}…)"
),
file_path=fname,
db_id=ws_id,
file_value=repo_id,
db_value=db_repo_id,
fixable=True,
_fix_context={"ws_id": ws_id, "correct_repo_id": repo_id},
)
# Continue to check drift even with mismatched repo
# C-04: status drift — normalise file value before comparing so that
# "done" (file) vs "completed" (DB) is not treated as drift.
db_status = ws.get("status", "")
normalised_file_status = normalise_workstream_status(file_status)
if file_status and db_status and normalised_file_status != db_status:
report.add(
severity="WARN", check_id="C-04",
message=(
f"Status drift in '{ws.get('slug')}': "
f"file={file_status!r} db={db_status!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=file_status,
db_value=db_status,
fixable=True,
_fix_context={
"ws_id": ws_id,
"field": "status",
"value": normalised_file_status, # always send DB-valid value
},
)
# C-05: title drift
db_title = ws.get("title", "")
if file_title and db_title and file_title != db_title:
report.add(
severity="WARN", check_id="C-05",
message=(
f"Title drift in '{ws.get('slug')}': "
f"file={file_title!r} db={db_title!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=file_title,
db_value=db_title,
fixable=True,
_fix_context={"ws_id": ws_id, "field": "title", "value": file_title},
)
# C-10, C-11, C-12: task-level checks
tasks = get_tasks_from_workplan(meta, body)
db_tasks = _api_get(api_base, "/tasks", {"workstream_id": ws_id})
db_task_by_id: dict[str, dict] = {}
if isinstance(db_tasks, list):
for t in db_tasks:
db_task_by_id[t["id"]] = t
file_task_sh_ids: set[str] = set()
for task in tasks:
if task.get("_parse_error"):
continue
t_id = str(task.get("id", "")).strip()
_raw_sh = task.get("state_hub_task_id")
t_sh_id = "" if _raw_sh is None else str(_raw_sh).strip().strip('"')
if t_sh_id in ("~", "null", "None", "none"):
t_sh_id = ""
t_status = str(task.get("status", "")).strip()
if t_sh_id:
file_task_sh_ids.add(t_sh_id)
db_task = _api_get(api_base, f"/tasks/{t_sh_id}")
if db_task is None:
report.add(
severity="FAIL", check_id="C-03",
message=f"state_hub_task_id {t_sh_id[:8]}… not found in DB",
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
fixable=False,
)
continue
# C-10 / C-15: task status drift
db_t_status = db_task.get("status", "")
if t_status and db_t_status and t_status != db_t_status:
db_rank = STATUS_ORDER.get(db_t_status, 0)
file_rank = STATUS_ORDER.get(t_status, 0)
if db_rank >= file_rank:
# C-15: DB is already at the same rank or ahead — prevent
# regression. Writeback syncs the file to match DB.
report.add(
severity="WARN", check_id="C-15",
message=(
f"DB task '{t_id}' is ahead of file "
f"(db={db_t_status!r}, file={t_status!r}) "
f"— regression prevented; writeback will sync file"
),
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
file_value=t_status,
db_value=db_t_status,
fixable=True,
_fix_context={
"task_id": t_sh_id,
"wp_file": str(wp_file),
"task_block_id": t_id,
"db_status": db_t_status,
},
)
else:
# C-10: file is ahead — apply file→DB sync (normal drift)
report.add(
severity="WARN", check_id="C-10",
message=(
f"Task status drift '{t_id}': "
f"file={t_status!r} db={db_t_status!r} (file wins)"
),
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
file_value=t_status,
db_value=db_t_status,
fixable=True,
_fix_context={"task_id": t_sh_id, "status": t_status},
)
elif t_id:
# C-11: task exists in file but not linked to DB
ws_status = ws.get("status", "")
report.add(
severity="WARN", check_id="C-11",
message=f"Task '{t_id}' has no state_hub_task_id",
file_path=f"{fname}#{t_id}",
fixable=True,
_fix_context={
"ws_id": ws_id,
"ws_status": ws_status,
"task": task,
"wp_file": str(wp_file),
"meta": meta,
"body": body,
},
)
# C-12: DB tasks with no file backing
if isinstance(db_tasks, list):
ws_status = ws.get("status", "")
ws_finished = ws_status in ("completed", "archived")
for db_t in db_tasks:
if db_t["id"] not in file_task_sh_ids:
db_t_status = db_t.get("status", "")
open_task = db_t_status not in ("done", "cancelled")
# Auto-cancel fixable when workstream is finished and task is open
fixable = ws_finished and open_task
report.add(
severity="WARN", check_id="C-12",
message=(
f"DB task '{db_t.get('title', '')}' "
f"(id={db_t['id'][:8]}…, status={db_t_status}) "
f"in workstream '{ws.get('slug')}' has no file backing"
),
db_id=db_t["id"],
fixable=fixable,
_fix_context={
"task_id": db_t["id"],
"ws_finished": ws_finished,
},
)
# C-13: all DB tasks done but workstream still active — worker forgot to close
db_status = ws.get("status", "")
if db_status == "active" and isinstance(db_tasks, list) and db_tasks:
non_terminal = [
t for t in db_tasks
if t.get("status") not in ("done", "cancelled")
]
if not non_terminal:
report.add(
severity="WARN", check_id="C-13",
message=(
f"All {len(db_tasks)} DB tasks for '{ws.get('slug')}' are "
f"done/cancelled but workstream status is still 'active'"
f"worker likely forgot update_workstream_status()"
),
file_path=fname,
db_id=ws_id,
db_value="active",
fixable=True,
_fix_context={
"ws_id": ws_id,
"field": "status",
"value": "completed",
},
)
# C-07 / C-08: orphan DB workstreams (have repo_id=this_repo but no backing file)
_check_orphan_db(api_base, repo_id, set(file_ws_ids.keys()), report)
# C-14: ghost duplicate — active workstream on same topic with no repo_id whose
# title matches a file-backed workstream. Root cause: create_workstream() called
# before the workplan file was written; fix-consistency then created a second
# workstream from the file, leaving the first as an invisible orphan.
_check_ghost_duplicates(api_base, workplan_infos, file_ws_ids, report)
return report
def _check_orphan_db(
api_base: str, repo_id: str, file_ws_ids: set[str], report: ConsistencyReport
) -> None:
"""Flag DB workstreams with repo_id=this_repo that have no backing workplan file."""
all_ws = _api_get(api_base, "/workstreams", {"repo_id": repo_id})
if not isinstance(all_ws, list):
return
for ws in all_ws:
ws_id = ws["id"]
if ws_id in file_ws_ids:
continue
ws_status = ws.get("status", "")
ws_slug = ws.get("slug", "")
if ws_status == "active":
report.add(
severity="FAIL", check_id="C-07",
message=(
f"Active DB workstream '{ws_slug}' (id={ws_id[:8]}…) "
f"has no backing workplan file — ADR-001 violation"
),
db_id=ws_id,
fixable=False,
)
elif ws_status in ("completed", "archived"):
report.add(
severity="INFO", check_id="C-08",
message=(
f"Completed/archived DB workstream '{ws_slug}' "
f"(id={ws_id[:8]}…, status={ws_status}) has no backing workplan file"
),
db_id=ws_id,
fixable=False,
)
def _check_ghost_duplicates(
api_base: str,
workplan_infos: list[tuple],
file_ws_ids: dict[str, tuple],
report: ConsistencyReport,
) -> None:
"""C-14: detect active workstreams with repo_id=null whose title matches a
file-backed workstream — these are ghosts created by premature create_workstream()
calls before the workplan file existed.
"""
# Build lookup: normalised title → file-backed workstream id
file_titles: dict[str, str] = {}
for _, meta, _ in workplan_infos:
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
title = str(meta.get("title", "")).strip().lower()
if title and ws_id:
file_titles[title] = ws_id
if not file_titles:
return
# Gather topic_ids from all file-backed workstreams so we can query by topic
topic_ids: set[str] = set()
for ws_id in file_ws_ids:
ws = _api_get(api_base, f"/workstreams/{ws_id}")
if ws and ws.get("topic_id"):
topic_ids.add(ws["topic_id"])
for topic_id in topic_ids:
topic_ws = _api_get(api_base, "/workstreams", {"topic_id": topic_id, "status": "active"})
if not isinstance(topic_ws, list):
continue
for ws in topic_ws:
ws_id = ws["id"]
if ws_id in file_ws_ids:
continue # legitimately linked
if ws.get("repo_id"):
continue # C-07 covers repo-scoped orphans
ws_title = ws.get("title", "").strip().lower()
if ws_title in file_titles:
file_backed_id = file_titles[ws_title]
report.add(
severity="WARN",
check_id="C-14",
message=(
f"Ghost duplicate: active workstream '{ws.get('slug')}' "
f"(id={ws_id[:8]}…, repo_id=null) has same title as "
f"file-backed workstream {file_backed_id[:8]}… — "
f"likely created via create_workstream() before workplan file existed; "
f"archive it"
),
db_id=ws_id,
fixable=False,
)
# ---------------------------------------------------------------------------
# Git helpers (T02 pull gate, T03 writeback)
# ---------------------------------------------------------------------------
def _git_pull(repo_path: str) -> tuple[bool, str]:
"""Run ``git pull --ff-only`` on *repo_path*.
Returns ``(success, message)`` where *message* describes the outcome.
Never raises — errors are returned as ``(False, "<reason>")``.
"""
try:
result = subprocess.run(
["git", "-C", repo_path, "pull", "--ff-only"],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
out = result.stdout.strip()
return True, out or "already up to date"
return False, result.stderr.strip() or "pull failed"
except Exception as exc:
return False, str(exc)
def _detect_behind_remote(repo_path: str) -> bool:
"""Return True if the remote tracking branch has commits the local repo lacks.
"Ahead" (local has unpushed commits) is NOT considered behind.
"Diverged" is treated as behind (remote progress could be lost).
Best-effort: returns False on any error (offline, no remote, etc.) so that
check-only mode is never blocked by network issues.
"""
try:
subprocess.run(
["git", "-C", repo_path, "fetch", "--quiet", "origin"],
capture_output=True, timeout=15,
)
# Count commits in remote that are not in local.
# git rev-list HEAD..@{u} → commits remote has that local lacks.
result = subprocess.run(
["git", "-C", repo_path, "rev-list", "--count", "HEAD..@{u}"],
capture_output=True, text=True, timeout=5,
)
if result.returncode != 0:
return False # no upstream configured or other error
return int(result.stdout.strip() or "0") > 0
except Exception:
return False
def _patch_task_status_in_file(
file_path: Path, task_block_id: str, new_status: str
) -> bool:
"""Update the ``status:`` field inside a task block identified by its id.
Only modifies lines inside a ```task … ``` fenced block whose ``id:``
matches *task_block_id*. Returns True if the file was changed.
"""
text = file_path.read_text(encoding="utf-8")
def _replace(m: re.Match) -> str:
# m.group(0) is the full ```task...``` block including fences.
# m.group(1) is the inner YAML content (no fences).
full_block = m.group(0)
inner = m.group(1).strip()
task_meta = _parse_yaml_block(inner)
if str(task_meta.get("id", "")).strip() != task_block_id:
return full_block
# Replace the status line only, leave everything else untouched.
return re.sub(
r"^(status:\s*)\S+",
rf"\g<1>{new_status}",
full_block,
flags=re.MULTILINE,
)
new_text = _TASK_BLOCK_RE.sub(_replace, text)
if new_text != text:
file_path.write_text(new_text, encoding="utf-8")
return True
return False
def _git_commit_writeback(
repo_path: str, file_path: Path, changes: list[str]
) -> bool:
"""Stage *file_path* and commit with a standard writeback message.
Returns True on success, False on any error (errors are logged to stderr
but do not abort the consistency run).
"""
from datetime import date as _date
summary = "\n".join(f" - {c}" for c in changes)
msg = (
f"chore(consistency): sync task status from DB [auto]\n\n"
f"Updated by fix-consistency on {_date.today().isoformat()}:\n"
f"{summary}"
)
try:
subprocess.run(
["git", "-C", repo_path, "add", str(file_path)],
check=True, capture_output=True,
)
subprocess.run(
["git", "-C", repo_path, "commit", "-m", msg],
check=True, capture_output=True,
)
return True
except subprocess.CalledProcessError as e:
print(
f"WARN: git commit failed for writeback: {e.stderr.decode().strip()}",
file=sys.stderr,
)
return False
# ---------------------------------------------------------------------------
# Fix engine
# ---------------------------------------------------------------------------
def fix_repo(
api_base: str,
repo_slug: str,
repo_path_override: str | None = None,
no_writeback: bool = False,
) -> ConsistencyReport:
"""Run checks then apply all auto-fixable issues. Returns updated report."""
report = check_repo(api_base, repo_slug, repo_path_override)
# T02 — pull gate: warn and skip all write operations when local repo is
# behind its remote tracking branch.
repo_path = report.repo_path
if repo_path and _detect_behind_remote(repo_path):
report.add(
severity="WARN", check_id="C-16",
message=(
f"Repo '{repo_slug}' is behind its remote tracking branch — "
f"pull before fixing to avoid clobbering remote progress. "
f"Run: git -C {repo_path} pull --ff-only"
),
fixable=False,
)
report.fixes_applied.append(
"C-16: all write operations skipped — local repo is behind remote"
)
return report
fixable = [i for i in report.issues if i.fixable]
for issue in fixable:
ctx = issue._fix_context
try:
if issue.check_id in ("C-04", "C-05", "C-13"):
ws_id = ctx["ws_id"]
result = _api_patch(api_base, f"/workstreams/{ws_id}",
{ctx["field"]: ctx["value"]})
if result is not None and "_error" not in result:
report.fixes_applied.append(
f"{issue.check_id} fixed: workstream {ws_id[:8]}"
f"{ctx['field']}{ctx['value']!r}"
)
elif result is not None:
report.fixes_applied.append(
f"{issue.check_id} FAILED: workstream {ws_id[:8]}"
f"{ctx['field']}{ctx['value']!r}: {result['_error']}"
)
elif issue.check_id == "C-06":
wp_file = Path(ctx["wp_file"])
meta = ctx["meta"]
domain = ctx["domain"]
repo_id_val = ctx["repo_id"]
body = ctx.get("body", "")
wp_id = str(meta.get("id", "")).strip()
title = str(meta.get("title", "")).strip()
status = str(meta.get("status", "active")).strip()
if status not in ("active", "completed", "archived"):
status = "active"
# Find topic_id for this domain
topics = _api_get(api_base, "/topics")
topic_id = None
if isinstance(topics, list):
for t in topics:
if t.get("domain_slug") == domain:
topic_id = t["id"]
break
if topic_id is None:
report.fixes_applied.append(
f"C-06 SKIP {wp_id}: no topic found for domain '{domain}'"
)
continue
slug = re.sub(r"[^a-z0-9-]", "-", wp_id.lower()).strip("-")
ws_data = _api_post(api_base, "/workstreams", {
"topic_id": topic_id,
"repo_id": repo_id_val,
"slug": slug,
"title": title or wp_id,
"status": status,
"owner": str(meta.get("owner", "")).strip() or None,
})
if ws_data is None:
report.fixes_applied.append(
f"C-06 FAIL {wp_id}: could not create workstream in DB"
)
continue
new_ws_id = ws_data["id"]
_add_frontmatter_field(wp_file, "state_hub_workstream_id", new_ws_id)
report.fixes_applied.append(
f"C-06 fixed: created workstream {new_ws_id[:8]}"
f"for {wp_id}, wrote ID to {wp_file.name}"
)
# Create tasks and inject IDs
tasks = get_tasks_from_workplan(meta, body)
for task in tasks:
if task.get("_parse_error"):
continue
t_id = str(task.get("id", "")).strip()
if not t_id:
continue
t_status = str(task.get("status", "todo")).strip()
if t_status not in VALID_TASK_STATUSES:
t_status = "todo"
t_priority = str(task.get("priority", "medium")).strip()
if t_priority not in VALID_TASK_PRIORITIES:
t_priority = "medium"
t_data = _api_post(api_base, "/tasks", {
"workstream_id": new_ws_id,
"title": str(task.get("title", t_id)).strip() or t_id,
"status": t_status,
"priority": t_priority,
"assignee": task.get("assignee") or None,
})
if t_data:
t_db_id = t_data["id"]
injected = _inject_task_id_into_block(
wp_file, "state_hub_task_id", t_db_id, t_id
)
if not injected:
_inject_task_id_frontmatter_list(wp_file, t_db_id, t_id)
report.fixes_applied.append(f" + task {t_id}{t_db_id[:8]}")
elif issue.check_id == "C-09":
ws_id = ctx["ws_id"]
correct_repo_id = ctx["correct_repo_id"]
result = _api_patch(api_base, f"/workstreams/{ws_id}",
{"repo_id": correct_repo_id})
if result is not None:
report.fixes_applied.append(
f"C-09 fixed: workstream {ws_id[:8]}"
f"repo_id → {correct_repo_id[:8]}"
)
elif issue.check_id == "C-10":
task_id = ctx["task_id"]
status = ctx["status"]
result = _api_patch(api_base, f"/tasks/{task_id}",
{"status": status})
if result is not None:
report.fixes_applied.append(
f"C-10 fixed: task {task_id[:8]}… status → {status!r}"
)
elif issue.check_id == "C-11":
ws_id = ctx["ws_id"]
ws_status = ctx.get("ws_status", "")
task = ctx["task"]
wp_file = Path(ctx["wp_file"])
meta = ctx["meta"]
body = ctx.get("body", "")
t_id = str(task.get("id", "")).strip()
# Skip creating tasks for finished workstreams — the workstream is
# done/archived so unlinked tasks are stale file artefacts, not gaps.
if ws_status in ("completed", "archived"):
report.fixes_applied.append(
f"C-11 skipped: task '{t_id}' in {ws_status} workstream — not created"
)
else:
t_status = str(task.get("status", "todo")).strip()
if t_status not in VALID_TASK_STATUSES:
t_status = "todo"
t_priority = str(task.get("priority", "medium")).strip()
if t_priority not in VALID_TASK_PRIORITIES:
t_priority = "medium"
t_data = _api_post(api_base, "/tasks", {
"workstream_id": ws_id,
"title": str(task.get("title", t_id)).strip() or t_id,
"status": t_status,
"priority": t_priority,
"assignee": task.get("assignee") or None,
})
if t_data:
t_db_id = t_data["id"]
injected = _inject_task_id_into_block(
wp_file, "state_hub_task_id", t_db_id, t_id
)
if not injected:
_inject_task_id_frontmatter_list(wp_file, t_db_id, t_id)
report.fixes_applied.append(
f"C-11 fixed: task '{t_id}'{t_db_id[:8]}"
)
elif issue.check_id == "C-12":
task_id = ctx["task_id"]
if ctx.get("ws_finished"):
result = _api_patch(api_base, f"/tasks/{task_id}", {"status": "cancelled"})
if result is not None:
report.fixes_applied.append(
f"C-12 fixed: orphan task {task_id[:8]}… cancelled (workstream finished)"
)
elif issue.check_id == "C-15":
# T03 — writeback: DB is ahead of file — patch file to match DB.
if no_writeback:
report.fixes_applied.append(
f"C-15 skipped (--no-writeback): task '{ctx['task_block_id']}' "
f"db={ctx['db_status']!r}"
)
else:
wp_file = Path(ctx["wp_file"])
task_block_id = ctx["task_block_id"]
db_status = ctx["db_status"]
old_status = issue.file_value
if _patch_task_status_in_file(wp_file, task_block_id, db_status):
committed = _git_commit_writeback(
repo_path,
wp_file,
[f"{task_block_id}: {old_status}{db_status}"],
)
suffix = " (committed)" if committed else " (file patched, commit failed)"
report.fixes_applied.append(
f"C-15 fixed: task '{task_block_id}' "
f"{old_status}{db_status}{suffix}"
)
else:
report.fixes_applied.append(
f"C-15 SKIP: could not locate task block '{task_block_id}' "
f"in {wp_file.name}"
)
except Exception as e:
report.fixes_applied.append(f"{issue.check_id} ERROR: {e}")
# Record that a sync run happened for this repo
from datetime import timezone as _tz
import datetime as _dt
now_iso = _dt.datetime.now(_tz.utc).isoformat()
_api_patch(api_base, f"/repos/{repo_slug}/", {"last_state_synced_at": now_iso})
return report
# Check IDs that are known-background noise in multi-machine setups:
# C-08 = completed/archived DB workstream with no file (pre-ADR-001 legacy)
# These alone do not warrant a pull+fix cycle.
_BACKGROUND_CHECKS: frozenset[str] = frozenset({"C-08"})
def _report_needs_action(report: ConsistencyReport, behind_remote: bool) -> bool:
"""Return True if the repo warrants a pull+fix cycle.
A repo is considered clean (no action needed) when:
- It is not behind its remote tracking branch, AND
- It has no FAIL issues, AND
- Every WARN/INFO issue is in the background-noise set (C-08).
"""
if behind_remote:
return True
if report.failures:
return True
actionable_warns = [
i for i in report.warnings + report.infos
if i.check_id not in _BACKGROUND_CHECKS
]
return bool(actionable_warns)
def fix_all_remote(
api_base: str,
no_writeback: bool = False,
) -> list[ConsistencyReport]:
"""Pull-then-fix all registered repos that need attention.
For each repo with a resolvable local path on this machine:
1. Skip if the path does not exist (repo lives on another machine).
2. Run check_repo() and detect_behind_remote() — read-only.
3. If the repo is clean (no actionable issues, not behind remote): skip.
4. Otherwise: git pull --ff-only, then fix_repo().
Returns one ConsistencyReport per repo that was *not* skipped as clean.
Repos skipped as clean are printed to stdout but not included in the return value.
"""
repos = _api_get(api_base, "/repos")
if not isinstance(repos, list):
print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr)
return []
reports: list[ConsistencyReport] = []
skipped_clean: list[str] = []
skipped_missing: list[str] = []
for repo in repos:
slug = repo["slug"]
# Resolve path using the same priority as check_repo
path = resolve_repo_path(repo)
if not path or not Path(path).is_dir():
skipped_missing.append(slug)
continue
# Read-only pass: detect issues and remote staleness
pre_report = check_repo(api_base, slug)
behind = _detect_behind_remote(path)
if not _report_needs_action(pre_report, behind):
skipped_clean.append(slug)
continue
# Pull before fixing
pull_ok, pull_msg = _git_pull(path)
pull_tag = f"pull: {pull_msg}"
if not pull_ok:
pull_tag = f"pull WARN: {pull_msg} — proceeding anyway"
report = fix_repo(api_base, slug, no_writeback=no_writeback)
report.fixes_applied.insert(0, pull_tag)
reports.append(report)
# Print clean/missing summary lines before the detailed reports
if skipped_clean:
print(f" CLEAN (skipped): {', '.join(skipped_clean)}")
if skipped_missing:
print(f" NOT ON THIS HOST (skipped): {', '.join(skipped_missing)}")
if skipped_clean or skipped_missing:
print()
return reports
# ---------------------------------------------------------------------------
# Output / rendering
# ---------------------------------------------------------------------------
def render_text(report: ConsistencyReport, show_info: bool = True) -> str:
SEP = "=" * 66
lines = [
f"ADR-001 Consistency Report",
f"Repo: {report.repo_slug}",
f"Path: {report.repo_path}",
SEP,
]
for sev in ("FAIL", "WARN", "INFO"):
section = [i for i in report.issues if i.severity == sev]
if not section or (sev == "INFO" and not show_info):
continue
lines.append(f"\n {sev}S ({len(section)}):")
for i in section:
loc = f" [{i.file_path}]" if i.file_path else ""
fix_tag = " [fixable]" if i.fixable else ""
lines.append(f" {i.check_id}{loc}{fix_tag}")
lines.append(f" {i.message}")
if i.file_value or i.db_value:
lines.append(f" file={i.file_value!r} db={i.db_value!r}")
if report.fixes_applied:
lines.append(f"\n FIXES APPLIED ({len(report.fixes_applied)}):")
for f in report.fixes_applied:
lines.append(f" {f}")
lines.append(f"\n{SEP}")
n_fail = len(report.failures)
n_warn = len(report.warnings)
n_info = len(report.infos)
lines.append(f" {n_fail} fail | {n_warn} warn | {n_info} info")
if n_fail:
lines.append(" RESULT: ✗ FAIL")
elif n_warn:
lines.append(" RESULT: ✓ PASS (with warnings)")
else:
lines.append(" RESULT: ✓ PASS")
lines.append(SEP)
return "\n".join(lines)
def report_to_dict(report: ConsistencyReport) -> dict:
return {
"repo_slug": report.repo_slug,
"repo_path": report.repo_path,
"issues": [
{
"severity": i.severity,
"check_id": i.check_id,
"message": i.message,
"file_path": i.file_path,
"db_id": i.db_id,
"file_value": i.file_value,
"db_value": i.db_value,
"fixable": i.fixable,
}
for i in report.issues
],
"fixes_applied": report.fixes_applied,
"summary": {
"fail": len(report.failures),
"warn": len(report.warnings),
"info": len(report.infos),
},
"result": (
"fail" if report.failures else
"warn" if report.warnings else
"pass"
),
}
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="ADR-001 consistency checker — bidirectional file↔DB validation",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--repo", metavar="SLUG",
help="Registered repo slug (e.g. the-custodian)")
group.add_argument("--all", action="store_true",
help="Run checks against all registered repos with local_path")
parser.add_argument("--fix", action="store_true",
help="Apply auto-fixable issues (status drift, repo mismatch, etc.)")
parser.add_argument("--remote", action="store_true",
help="Pull each repo before fixing; when used with --all, skips repos "
"that are already clean (no actionable issues, not behind remote). "
"Implies --fix.")
parser.add_argument("--no-writeback", action="store_true", dest="no_writeback",
help="Disable DB→file status writeback (C-15) while keeping other fixes")
parser.add_argument("--repo-path", metavar="PATH", default=None,
help="Override the local repo path (useful when the DB has a different "
"machine's path). Takes priority over host_paths and local_path.")
parser.add_argument("--api-base", default="http://127.0.0.1:8000",
help="State Hub API base URL")
parser.add_argument("--json", action="store_true", dest="as_json",
help="Output JSON instead of human-readable text")
args = parser.parse_args()
no_wb = getattr(args, "no_writeback", False)
do_fix = args.fix or args.remote
# --remote --all: smart pull+fix across all repos
if args.remote and args.all:
reports = fix_all_remote(args.api_base, no_writeback=no_wb)
if not reports:
sys.exit(0)
else:
# Resolve repo list
repo_slugs: list[str] = []
if args.all:
repos = _api_get(args.api_base, "/repos")
if not isinstance(repos, list):
print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr)
sys.exit(1)
repo_slugs = [r["slug"] for r in repos if r.get("local_path")]
if not repo_slugs:
print("No repos with local_path registered.", file=sys.stderr)
sys.exit(0)
else:
repo_slugs = [args.repo]
# --repo-path only applies to single-repo runs; silently ignored with --all
path_override = args.repo_path if not args.all else None
if args.remote:
# Single-repo remote: pull first, then fix
slug = repo_slugs[0]
repo = _api_get(args.api_base, f"/repos/{slug}")
path = resolve_repo_path(repo or {}, path_override) if repo else (path_override or "")
if path:
pull_ok, pull_msg = _git_pull(path)
prefix = "pull" if pull_ok else "pull WARN"
print(f" {prefix}: {pull_msg}")
report = fix_repo(args.api_base, slug, path_override, no_writeback=no_wb)
reports = [report]
elif do_fix:
reports = [
fix_repo(args.api_base, slug, path_override, no_writeback=no_wb)
for slug in repo_slugs
]
else:
reports = [check_repo(args.api_base, slug, path_override) for slug in repo_slugs]
if args.as_json:
output = (
report_to_dict(reports[0])
if len(reports) == 1
else [report_to_dict(r) for r in reports]
)
print(json.dumps(output, indent=2))
else:
for report in reports:
print(render_text(report))
print()
any_fail = any(r.failures for r in reports)
any_warn = any(r.warnings for r in reports)
sys.exit(1 if any_fail else 2 if any_warn else 0)
if __name__ == "__main__":
main()