feat(consistency): distributed multi-machine safety (CUST-WP-0026)

T01 — No-regress rule (C-15): fix-consistency now detects when a DB task
status is ahead of the workplan file (e.g. marked done on CoulombCore)
and emits C-15 WARN instead of regressing the DB back to the stale file
value. STATUS_ORDER ranking: todo(0) < in_progress/blocked(1) < done/cancelled(2).

T02 — Pull gate (C-16): fix_repo runs git fetch + rev-parse at the start
of every --fix run. If the local repo is behind its remote tracking branch,
all write operations are skipped and C-16 WARN is emitted. Best-effort:
offline/no-remote silently skips the check.

T03 — DB→file writeback: C-15 fix path patches the status field in the
matching task block and git-commits the change with a standard message.
--no-writeback flag disables writeback while keeping T01/T02 active.

T04 — CLAUDE.md + session-protocol.template updated with new guidance,
C-15/C-16 semantics, and fix-consistency-remote recommendation.

T05 — Makefile: fix-consistency-remote pulls then fixes in one step.

16 new tests; 155 passed total.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-26 10:19:23 +01:00
parent dff9806bb6
commit 505ace5617
4 changed files with 406 additions and 20 deletions

View File

@@ -252,6 +252,20 @@ fix-consistency:
$(if $(REPO_PATH),--repo-path "$(REPO_PATH)",); \ $(if $(REPO_PATH),--repo-path "$(REPO_PATH)",); \
e=$$?; [ $$e -eq 2 ] && exit 0 || exit $$e e=$$?; [ $$e -eq 2 ] && exit 0 || exit $$e
## Pull repo then fix consistency (safe for multi-machine workflows): make fix-consistency-remote REPO=net-kingdom
fix-consistency-remote:
@test -n "$(REPO)" || (echo "ERROR: REPO is required. Usage: make fix-consistency-remote REPO=<slug>"; exit 1)
$(eval _REMOTE_REPO_PATH := $(shell \
curl -s $${API_BASE:-http://127.0.0.1:8000}/repos/?slug=$(REPO) | \
python3 -c "import json,sys; \
repos=json.load(sys.stdin); \
print(next((r.get('local_path','') for r in repos if r['slug']=='$(REPO)'), ''))" \
))
@test -n "$(_REMOTE_REPO_PATH)" || (echo "ERROR: repo '$(REPO)' not found or has no local_path in state-hub"; exit 1)
git -C "$(_REMOTE_REPO_PATH)" pull --ff-only || \
(echo "WARN: pull failed (conflicts or no remote) — running fix-consistency anyway"; true)
$(MAKE) fix-consistency REPO=$(REPO) REPO_PATH=$(_REMOTE_REPO_PATH)
## Check all registered repos for ADR-001 consistency ## Check all registered repos for ADR-001 consistency
check-consistency-all: check-consistency-all:
uv run python scripts/consistency_check.py --all $(if $(API_BASE),--api-base "$(API_BASE)",); \ uv run python scripts/consistency_check.py --all $(if $(API_BASE),--api-base "$(API_BASE)",); \

View File

@@ -19,10 +19,12 @@ Checks:
C-12 orphan-db-task WARN No DB task in workstream has no file backing C-12 orphan-db-task WARN No DB task in workstream has no file backing
C-13 workstream-auto-complete WARN Yes All DB tasks done but workstream still active C-13 workstream-auto-complete WARN Yes All DB tasks done but workstream still active
C-14 ghost-duplicate WARN No Active topic workstream with no repo_id matches a file-backed title — probable ghost from premature create_workstream() call C-14 ghost-duplicate WARN No Active topic workstream with no repo_id matches a file-backed title — probable ghost from premature create_workstream() call
C-15 task-db-ahead WARN Yes DB task status is ahead of file — regression prevented; writeback syncs file
C-16 repo-behind-remote WARN No Local repo is behind remote tracking branch — --fix skipped to avoid clobbering remote progress
Usage: Usage:
python scripts/consistency_check.py --repo SLUG [--fix] [--json] [--api-base URL] python scripts/consistency_check.py --repo SLUG [--fix] [--no-writeback] [--json] [--api-base URL]
python scripts/consistency_check.py --all [--fix] [--json] [--api-base URL] python scripts/consistency_check.py --all [--fix] [--no-writeback] [--json] [--api-base URL]
Exit codes: Exit codes:
0 — ok (no FAILs; only WARNs/INFOs) 0 — ok (no FAILs; only WARNs/INFOs)
@@ -35,6 +37,7 @@ import argparse
import json import json
import re import re
import socket import socket
import subprocess
import sys import sys
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
@@ -70,6 +73,16 @@ FILE_TO_DB_WORKSTREAM_STATUS: dict[str, str] = {
"done": "completed", "done": "completed",
} }
# Ordinal ranking for task statuses used by the no-regress rule (T01/C-15).
# blocked and in_progress share rank 1 — both are "in flight".
STATUS_ORDER: dict[str, int] = {
"todo": 0,
"in_progress": 1,
"blocked": 1,
"done": 2,
"cancelled": 2,
}
def normalise_workstream_status(status: str) -> str: def normalise_workstream_status(status: str) -> str:
"""Translate a workplan file status value to its DB-canonical equivalent.""" """Translate a workplan file status value to its DB-canonical equivalent."""
@@ -516,22 +529,48 @@ def check_repo(api_base: str, repo_slug: str, repo_path_override: str | None = N
fixable=False, fixable=False,
) )
continue continue
# C-10: task status drift # C-10 / C-15: task status drift
db_t_status = db_task.get("status", "") db_t_status = db_task.get("status", "")
if t_status and db_t_status and t_status != db_t_status: if t_status and db_t_status and t_status != db_t_status:
report.add( db_rank = STATUS_ORDER.get(db_t_status, 0)
severity="WARN", check_id="C-10", file_rank = STATUS_ORDER.get(t_status, 0)
message=( if db_rank >= file_rank:
f"Task status drift '{t_id}': " # C-15: DB is already at the same rank or ahead — prevent
f"file={t_status!r} db={db_t_status!r} (file wins)" # regression. Writeback syncs the file to match DB.
), report.add(
file_path=f"{fname}#{t_id}", severity="WARN", check_id="C-15",
db_id=t_sh_id, message=(
file_value=t_status, f"DB task '{t_id}' is ahead of file "
db_value=db_t_status, f"(db={db_t_status!r}, file={t_status!r}) "
fixable=True, f"— regression prevented; writeback will sync file"
_fix_context={"task_id": t_sh_id, "status": t_status}, ),
) file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
file_value=t_status,
db_value=db_t_status,
fixable=True,
_fix_context={
"task_id": t_sh_id,
"wp_file": str(wp_file),
"task_block_id": t_id,
"db_status": db_t_status,
},
)
else:
# C-10: file is ahead — apply file→DB sync (normal drift)
report.add(
severity="WARN", check_id="C-10",
message=(
f"Task status drift '{t_id}': "
f"file={t_status!r} db={db_t_status!r} (file wins)"
),
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
file_value=t_status,
db_value=db_t_status,
fixable=True,
_fix_context={"task_id": t_sh_id, "status": t_status},
)
elif t_id: elif t_id:
# C-11: task exists in file but not linked to DB # C-11: task exists in file but not linked to DB
ws_status = ws.get("status", "") ws_status = ws.get("status", "")
@@ -704,13 +743,131 @@ def _check_ghost_duplicates(
) )
# ---------------------------------------------------------------------------
# Git helpers (T02 pull gate, T03 writeback)
# ---------------------------------------------------------------------------
def _detect_behind_remote(repo_path: str) -> bool:
"""Return True if the local repo is behind its remote tracking branch.
Best-effort: returns False on any error (offline, no remote, etc.) so that
check-only mode is never blocked by network issues.
"""
try:
subprocess.run(
["git", "-C", repo_path, "fetch", "--quiet", "origin"],
capture_output=True, timeout=15,
)
local = subprocess.run(
["git", "-C", repo_path, "rev-parse", "HEAD"],
capture_output=True, text=True, timeout=5,
).stdout.strip()
remote = subprocess.run(
["git", "-C", repo_path, "rev-parse", "@{u}"],
capture_output=True, text=True, timeout=5,
).stdout.strip()
return bool(local and remote and local != remote)
except Exception:
return False
def _patch_task_status_in_file(
file_path: Path, task_block_id: str, new_status: str
) -> bool:
"""Update the ``status:`` field inside a task block identified by its id.
Only modifies lines inside a ```task … ``` fenced block whose ``id:``
matches *task_block_id*. Returns True if the file was changed.
"""
text = file_path.read_text(encoding="utf-8")
def _replace(m: re.Match) -> str:
# m.group(0) is the full ```task...``` block including fences.
# m.group(1) is the inner YAML content (no fences).
full_block = m.group(0)
inner = m.group(1).strip()
task_meta = _parse_yaml_block(inner)
if str(task_meta.get("id", "")).strip() != task_block_id:
return full_block
# Replace the status line only, leave everything else untouched.
return re.sub(
r"^(status:\s*)\S+",
rf"\g<1>{new_status}",
full_block,
flags=re.MULTILINE,
)
new_text = _TASK_BLOCK_RE.sub(_replace, text)
if new_text != text:
file_path.write_text(new_text, encoding="utf-8")
return True
return False
def _git_commit_writeback(
repo_path: str, file_path: Path, changes: list[str]
) -> bool:
"""Stage *file_path* and commit with a standard writeback message.
Returns True on success, False on any error (errors are logged to stderr
but do not abort the consistency run).
"""
from datetime import date as _date
summary = "\n".join(f" - {c}" for c in changes)
msg = (
f"chore(consistency): sync task status from DB [auto]\n\n"
f"Updated by fix-consistency on {_date.today().isoformat()}:\n"
f"{summary}"
)
try:
subprocess.run(
["git", "-C", repo_path, "add", str(file_path)],
check=True, capture_output=True,
)
subprocess.run(
["git", "-C", repo_path, "commit", "-m", msg],
check=True, capture_output=True,
)
return True
except subprocess.CalledProcessError as e:
print(
f"WARN: git commit failed for writeback: {e.stderr.decode().strip()}",
file=sys.stderr,
)
return False
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Fix engine # Fix engine
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def fix_repo(api_base: str, repo_slug: str, repo_path_override: str | None = None) -> ConsistencyReport: def fix_repo(
api_base: str,
repo_slug: str,
repo_path_override: str | None = None,
no_writeback: bool = False,
) -> ConsistencyReport:
"""Run checks then apply all auto-fixable issues. Returns updated report.""" """Run checks then apply all auto-fixable issues. Returns updated report."""
report = check_repo(api_base, repo_slug, repo_path_override) report = check_repo(api_base, repo_slug, repo_path_override)
# T02 — pull gate: warn and skip all write operations when local repo is
# behind its remote tracking branch.
repo_path = report.repo_path
if repo_path and _detect_behind_remote(repo_path):
report.add(
severity="WARN", check_id="C-16",
message=(
f"Repo '{repo_slug}' is behind its remote tracking branch — "
f"pull before fixing to avoid clobbering remote progress. "
f"Run: git -C {repo_path} pull --ff-only"
),
fixable=False,
)
report.fixes_applied.append(
"C-16: all write operations skipped — local repo is behind remote"
)
return report
fixable = [i for i in report.issues if i.fixable] fixable = [i for i in report.issues if i.fixable]
for issue in fixable: for issue in fixable:
@@ -878,6 +1035,35 @@ def fix_repo(api_base: str, repo_slug: str, repo_path_override: str | None = Non
f"C-12 fixed: orphan task {task_id[:8]}… cancelled (workstream finished)" f"C-12 fixed: orphan task {task_id[:8]}… cancelled (workstream finished)"
) )
elif issue.check_id == "C-15":
# T03 — writeback: DB is ahead of file — patch file to match DB.
if no_writeback:
report.fixes_applied.append(
f"C-15 skipped (--no-writeback): task '{ctx['task_block_id']}' "
f"db={ctx['db_status']!r}"
)
else:
wp_file = Path(ctx["wp_file"])
task_block_id = ctx["task_block_id"]
db_status = ctx["db_status"]
old_status = issue.file_value
if _patch_task_status_in_file(wp_file, task_block_id, db_status):
committed = _git_commit_writeback(
repo_path,
wp_file,
[f"{task_block_id}: {old_status}{db_status}"],
)
suffix = " (committed)" if committed else " (file patched, commit failed)"
report.fixes_applied.append(
f"C-15 fixed: task '{task_block_id}' "
f"{old_status}{db_status}{suffix}"
)
else:
report.fixes_applied.append(
f"C-15 SKIP: could not locate task block '{task_block_id}' "
f"in {wp_file.name}"
)
except Exception as e: except Exception as e:
report.fixes_applied.append(f"{issue.check_id} ERROR: {e}") report.fixes_applied.append(f"{issue.check_id} ERROR: {e}")
@@ -984,6 +1170,8 @@ def main() -> None:
help="Run checks against all registered repos with local_path") help="Run checks against all registered repos with local_path")
parser.add_argument("--fix", action="store_true", parser.add_argument("--fix", action="store_true",
help="Apply auto-fixable issues (status drift, repo mismatch, etc.)") help="Apply auto-fixable issues (status drift, repo mismatch, etc.)")
parser.add_argument("--no-writeback", action="store_true", dest="no_writeback",
help="Disable DB→file status writeback (C-15) while keeping other fixes")
parser.add_argument("--repo-path", metavar="PATH", default=None, parser.add_argument("--repo-path", metavar="PATH", default=None,
help="Override the local repo path (useful when the DB has a different " help="Override the local repo path (useful when the DB has a different "
"machine's path). Takes priority over host_paths and local_path.") "machine's path). Takes priority over host_paths and local_path.")
@@ -1007,10 +1195,16 @@ def main() -> None:
else: else:
repo_slugs = [args.repo] repo_slugs = [args.repo]
runner = fix_repo if args.fix else check_repo
# --repo-path only applies to single-repo runs; silently ignored with --all # --repo-path only applies to single-repo runs; silently ignored with --all
path_override = args.repo_path if not args.all else None path_override = args.repo_path if not args.all else None
reports = [runner(args.api_base, slug, path_override) for slug in repo_slugs] if args.fix:
no_wb = getattr(args, "no_writeback", False)
reports = [
fix_repo(args.api_base, slug, path_override, no_writeback=no_wb)
for slug in repo_slugs
]
else:
reports = [check_repo(args.api_base, slug, path_override) for slug in repo_slugs]
if args.as_json: if args.as_json:
output = ( output = (

View File

@@ -42,7 +42,16 @@ If no workstreams: follow First Session Protocol (`first-session.md`).
``` ```
add_progress_event(summary="...", topic_id="{TOPIC_ID}", workstream_id="<uuid>") add_progress_event(summary="...", topic_id="{TOPIC_ID}", workstream_id="<uuid>")
``` ```
If workplan files were modified: If workplan files were modified, ensure the local copy is up to date first:
```bash ```bash
git -C <repo_path> pull --ff-only
cd ~/the-custodian/state-hub && make fix-consistency REPO={REPO_SLUG} cd ~/the-custodian/state-hub && make fix-consistency REPO={REPO_SLUG}
``` ```
For repos where implementation runs on a remote machine (e.g. CoulombCore),
use the combined target which pulls before fixing:
```bash
cd ~/the-custodian/state-hub && make fix-consistency-remote REPO={REPO_SLUG}
```
**C-15** (DB task ahead of file) is normal in multi-machine workflows — writeback
will sync the file to match DB. **C-16** (repo behind remote) blocks all writes
until you pull — intentional to prevent clobbering remote progress.

View File

@@ -24,6 +24,9 @@ from consistency_check import (
ConsistencyReport, ConsistencyReport,
Issue, Issue,
FILE_TO_DB_WORKSTREAM_STATUS, FILE_TO_DB_WORKSTREAM_STATUS,
STATUS_ORDER,
_detect_behind_remote,
_patch_task_status_in_file,
get_tasks_from_workplan, get_tasks_from_workplan,
normalise_workstream_status, normalise_workstream_status,
parse_frontmatter, parse_frontmatter,
@@ -371,3 +374,169 @@ class TestNormaliseWorkstreamStatus:
def test_c04_real_drift_still_detected(self): def test_c04_real_drift_still_detected(self):
"""done (file) vs active (DB) IS real drift and must be detected.""" """done (file) vs active (DB) IS real drift and must be detected."""
assert normalise_workstream_status("done") != normalise_workstream_status("active") assert normalise_workstream_status("done") != normalise_workstream_status("active")
# ---------------------------------------------------------------------------
# STATUS_ORDER / no-regress rule (T01 / C-15)
# ---------------------------------------------------------------------------
class TestStatusOrder:
"""STATUS_ORDER drives the no-regress rule: DB-ahead wins, file-ahead syncs."""
def test_todo_is_lowest(self):
assert STATUS_ORDER["todo"] == 0
def test_done_and_cancelled_are_highest(self):
assert STATUS_ORDER["done"] == STATUS_ORDER["cancelled"] == 2
def test_in_progress_and_blocked_are_mid(self):
assert STATUS_ORDER["in_progress"] == STATUS_ORDER["blocked"] == 1
def test_db_ahead_detected(self):
"""done (DB) vs todo (file) — DB is ahead."""
assert STATUS_ORDER["done"] > STATUS_ORDER["todo"]
def test_file_ahead_detected(self):
"""done (file) vs todo (DB) — file is ahead, should sync."""
assert STATUS_ORDER["todo"] < STATUS_ORDER["done"]
def test_same_rank_treated_as_db_ahead(self):
"""in_progress (DB) vs blocked (file) — same rank, no regression."""
assert STATUS_ORDER["in_progress"] >= STATUS_ORDER["blocked"]
def test_todo_to_done_is_regression(self):
"""Applying file=todo to DB=done would be a regression."""
db_rank = STATUS_ORDER["done"]
file_rank = STATUS_ORDER["todo"]
assert db_rank >= file_rank # → should emit C-15, not C-10
# ---------------------------------------------------------------------------
# _detect_behind_remote (T02 / C-16)
# ---------------------------------------------------------------------------
class TestDetectBehindRemote:
"""_detect_behind_remote must return False on any error (best-effort)."""
def test_returns_false_for_nonexistent_path(self):
assert _detect_behind_remote("/nonexistent/path/xyz") is False
def test_returns_false_for_non_git_dir(self, tmp_path):
assert _detect_behind_remote(str(tmp_path)) is False
def test_returns_false_for_repo_without_remote(self, tmp_path):
"""A local-only repo with no remote tracking branch → not behind."""
import subprocess
subprocess.run(["git", "init", str(tmp_path)], capture_output=True)
subprocess.run(
["git", "-C", str(tmp_path), "commit", "--allow-empty", "-m", "init"],
capture_output=True,
)
# No remote configured → rev-parse @{u} fails → best-effort returns False
assert _detect_behind_remote(str(tmp_path)) is False
def test_returns_false_when_up_to_date(self, tmp_path):
"""Clone from a local bare repo, no new commits → not behind."""
import subprocess
bare = tmp_path / "bare.git"
clone = tmp_path / "clone"
bare.mkdir()
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
subprocess.run(
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
capture_output=True,
)
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD"], capture_output=True)
assert _detect_behind_remote(str(clone)) is False
def test_returns_true_when_behind(self, tmp_path):
"""Remote has a commit the clone doesn't → clone is behind."""
import subprocess
bare = tmp_path / "bare.git"
clone = tmp_path / "clone"
bare.mkdir()
subprocess.run(["git", "init", "--bare", str(bare)], capture_output=True)
subprocess.run(["git", "clone", str(bare), str(clone)], capture_output=True)
# Initial commit pushed so there's an upstream ref
subprocess.run(
["git", "-C", str(clone), "commit", "--allow-empty", "-m", "init"],
capture_output=True,
)
subprocess.run(["git", "-C", str(clone), "push", "origin", "HEAD:main"], capture_output=True)
# Add a commit directly in the bare repo (simulates remote-only progress)
work = tmp_path / "work"
subprocess.run(["git", "clone", str(bare), str(work)], capture_output=True)
subprocess.run(
["git", "-C", str(work), "commit", "--allow-empty", "-m", "remote commit"],
capture_output=True,
)
subprocess.run(["git", "-C", str(work), "push"], capture_output=True)
# After fetch the clone should appear behind
assert _detect_behind_remote(str(clone)) is True
# ---------------------------------------------------------------------------
# _patch_task_status_in_file (T03 writeback)
# ---------------------------------------------------------------------------
class TestPatchTaskStatusInFile:
"""_patch_task_status_in_file must only change the status field in the
matching task block and leave everything else untouched."""
def _make_workplan(self, tmp_path, content: str) -> Path:
f = tmp_path / "workplan.md"
f.write_text(content, encoding="utf-8")
return f
def test_patches_matching_task_block(self, tmp_path):
content = (
"---\nid: WP-001\n---\n"
"## My Task\n\n"
"```task\n"
"id: T01\n"
"status: todo\n"
"priority: high\n"
"```\n"
)
f = self._make_workplan(tmp_path, content)
result = _patch_task_status_in_file(f, "T01", "done")
assert result is True
patched = f.read_text()
assert "status: done" in patched
assert "status: todo" not in patched
assert "priority: high" in patched # other fields untouched
def test_does_not_patch_non_matching_block(self, tmp_path):
content = (
"---\nid: WP-001\n---\n"
"```task\n"
"id: T02\n"
"status: todo\n"
"```\n"
)
f = self._make_workplan(tmp_path, content)
result = _patch_task_status_in_file(f, "T01", "done")
assert result is False
assert "status: todo" in f.read_text()
def test_patches_correct_block_among_multiple(self, tmp_path):
content = (
"---\nid: WP-001\n---\n"
"```task\nid: T01\nstatus: todo\n```\n"
"```task\nid: T02\nstatus: in_progress\n```\n"
)
f = self._make_workplan(tmp_path, content)
_patch_task_status_in_file(f, "T02", "done")
patched = f.read_text()
assert "id: T01\nstatus: todo" in patched
assert "id: T02\nstatus: done" in patched
def test_idempotent_when_already_correct(self, tmp_path):
content = "---\nid: WP\n---\n```task\nid: T01\nstatus: done\n```\n"
f = self._make_workplan(tmp_path, content)
result = _patch_task_status_in_file(f, "T01", "done")
# status matches, re.sub produces same text → no write → False
assert result is False