Files
state-hub/scripts/consistency_check.py
2026-05-06 04:04:53 +02:00

1987 lines
80 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""consistency_check.py — ADR-001 consistency checking engine.
Runs bidirectional checks between workplan files in a registered repo and the
state-hub DB. The file is always authoritative; the DB is the cache/index layer.
Checks:
C-01 workplans-dir FAIL No workplans/ directory missing
C-02 workplan-parse FAIL No Workplan file cannot be parsed
C-03 workstream-stale-ref FAIL No state_hub_workstream_id in file not in DB
C-04 workstream-status-drift WARN Yes File status != DB status (file wins)
C-05 workstream-title-drift WARN Yes File title != DB title (file wins)
C-06 workstream-unlinked WARN Yes Workplan has no state_hub_workstream_id
C-07 orphan-db-active FAIL No Active DB workstream, no backing file
C-08 orphan-db-completed INFO No Completed/archived DB workstream, no file
C-09 workstream-repo-mismatch FAIL Yes DB workstream repo_id != file location
C-10 task-status-drift WARN Yes Task status differs between file and DB
C-11 task-unlinked WARN Yes Task block has no state_hub_task_id
C-12 orphan-db-task WARN No DB task in workstream has no file backing
C-13 workstream-auto-complete WARN Yes All DB tasks done but workstream still active
C-14 ghost-duplicate WARN No Active topic workstream with no repo_id matches a file-backed title — probable ghost from premature create_workstream() call
C-15 task-db-ahead WARN Yes DB task status is ahead of file — regression prevented; writeback syncs file
C-16 repo-behind-remote WARN No Local repo is behind remote tracking branch — --fix skipped to avoid clobbering remote progress
C-17 repo-ahead-push-failed WARN No Local repo has unpushed commits and push failed — writes skipped to prevent runaway divergence
C-19 workstream-planning-drift WARN Yes planning_priority/planning_order differs between file and DB
C-20 workstream-dependency-missing WARN Yes Workplan dependency frontmatter missing from DB graph
Usage:
python scripts/consistency_check.py --repo SLUG [--fix] [--no-writeback] [--json] [--api-base URL]
python scripts/consistency_check.py --all [--fix] [--no-writeback] [--json] [--api-base URL]
python scripts/consistency_check.py --here [PATH] [--fix] [--no-writeback] [--json] [--api-base URL]
Exit codes:
0 — ok (no FAILs; only WARNs/INFOs)
1 — one or more FAILs present
2 — warn-only (no FAILs, but WARNs present)
"""
from __future__ import annotations
import argparse
import os
import json
import re
import socket
import subprocess
import sys
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
try:
import yaml as _yaml
_HAS_YAML = True
except ImportError:
_HAS_YAML = False
try:
import httpx as _httpx
_HAS_HTTPX = True
except ImportError:
_HAS_HTTPX = False
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL)
_HEADING_RE = re.compile(r"^#{1,4}\s+(.+?)$", re.MULTILINE)
_ARCHIVED_WP_RE = re.compile(r"^\d{6}-(.+\.md)$")
VALID_WP_STATUSES = {"active", "completed", "archived"}
VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"}
VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"}
VALID_DEP_RELATIONSHIPS = {"blocks", "starts_after", "informs", "soft_dependency"}
DEFAULT_REMOTE_ALL_MAX_SECONDS = int(os.environ.get("CONSISTENCY_REMOTE_ALL_MAX_SECONDS", "300"))
# Workplan files use task-style vocabulary ("done"); the DB workstream API uses
# "completed". This map translates file values to DB values before comparison
# and before PATCHing, so "done" vs "completed" is never flagged as C-04 drift.
FILE_TO_DB_WORKSTREAM_STATUS: dict[str, str] = {
"done": "completed",
"todo": "active", # workplan not yet started → active workstream in DB
}
# Ordinal ranking for task statuses used by the no-regress rule (T01/C-15).
# blocked and in_progress share rank 1 — both are "in flight".
STATUS_ORDER: dict[str, int] = {
"todo": 0,
"in_progress": 1,
"blocked": 1,
"done": 2,
"cancelled": 2,
}
def normalise_workstream_status(status: str) -> str:
"""Translate a workplan file status value to its DB-canonical equivalent."""
return FILE_TO_DB_WORKSTREAM_STATUS.get(status, status)
def canonical_workplan_filename(path: Path) -> str:
"""Return the workplan filename without an archive completion-date prefix."""
return _ARCHIVED_WP_RE.sub(r"\1", path.name)
def workplan_display_path(repo_dir: Path, path: Path) -> str:
"""Stable relative path for reports, including archived/ when applicable."""
try:
return str(path.relative_to(repo_dir))
except ValueError:
return path.name
def iter_workplan_files(workplans_dir: Path, include_archived: bool = True) -> list[Path]:
"""Return active root workplans plus archived workplans when requested."""
files = sorted(workplans_dir.glob("*.md"))
archived_dir = workplans_dir / "archived"
if include_archived and archived_dir.is_dir():
files.extend(sorted(archived_dir.glob("*.md")))
return files
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
@dataclass
class Issue:
severity: str # FAIL | WARN | INFO
check_id: str # C-01 through C-12
message: str
file_path: str = ""
db_id: str = ""
file_value: str = ""
db_value: str = ""
fixable: bool = False
_fix_context: dict = field(default_factory=dict, repr=False)
@dataclass
class ConsistencyReport:
repo_slug: str
repo_path: str
issues: list[Issue] = field(default_factory=list)
fixes_applied: list[str] = field(default_factory=list)
def add(self, **kwargs) -> Issue:
issue = Issue(**kwargs)
self.issues.append(issue)
return issue
@property
def failures(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "FAIL"]
@property
def warnings(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "WARN"]
@property
def infos(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "INFO"]
@contextmanager
def run_lock(name: str):
"""Hold a nonblocking process lock for long-running consistency modes."""
try:
import fcntl
except ImportError:
yield True
return
lock_path = Path(os.environ.get("CONSISTENCY_LOCK_DIR", "/tmp")) / f"custodian-{name}.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
handle = lock_path.open("w", encoding="utf-8")
try:
try:
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
yield False
return
handle.seek(0)
handle.truncate()
handle.write(f"{os.getpid()} {datetime.utcnow().isoformat()}Z\n")
handle.flush()
yield True
finally:
try:
fcntl.flock(handle, fcntl.LOCK_UN)
except OSError:
pass
handle.close()
# ---------------------------------------------------------------------------
# YAML / frontmatter parsing
# ---------------------------------------------------------------------------
def _parse_yaml_block(raw: str) -> dict:
"""Parse a YAML string; fallback to simple key:value if pyyaml unavailable."""
if _HAS_YAML:
try:
return _yaml.safe_load(raw) or {}
except _yaml.YAMLError:
return {"_parse_error": True}
result: dict = {}
for line in raw.splitlines():
if ":" in line and not line.startswith(" "):
k, _, v = line.partition(":")
result[k.strip()] = v.strip().strip('"').strip("'")
return result
def parse_frontmatter(text: str) -> tuple[dict, str]:
"""Split YAML frontmatter from body. Returns ({}, text) if no frontmatter."""
if not text.startswith("---"):
return {}, text
parts = text.split("---", 2)
if len(parts) < 3:
return {}, text
meta = _parse_yaml_block(parts[1].strip())
return meta, parts[2]
def parse_task_blocks(body: str) -> list[dict]:
"""Extract task blocks, injecting title from the nearest preceding ### heading."""
headings = [(m.start(), m.group(1).strip()) for m in _HEADING_RE.finditer(body)]
results = []
for m in _TASK_BLOCK_RE.finditer(body):
task = _parse_yaml_block(m.group(1).strip())
if "title" not in task:
prev = [(pos, text) for pos, text in headings if pos < m.start()]
if prev:
task["title"] = prev[-1][1]
results.append(task)
return results
def get_tasks_from_workplan(meta: dict, body: str) -> list[dict]:
"""Get tasks from workplan — handles both ```task``` blocks and tasks: YAML list."""
blocks = parse_task_blocks(body)
if blocks:
return blocks
# Fallback: tasks embedded as a YAML list in frontmatter (activity-core style)
tasks = meta.get("tasks")
if isinstance(tasks, list):
return tasks
return []
def _as_list(value: Any) -> list[str]:
if value is None:
return []
if isinstance(value, list):
return [str(item).strip().strip('"') for item in value if str(item).strip()]
if isinstance(value, str):
return [item.strip().strip('"') for item in value.split(",") if item.strip()]
return [str(value).strip().strip('"')]
def _as_int_or_none(value: Any) -> int | None:
if value in (None, "", "~", "null", "None", "none"):
return None
try:
return int(value)
except (TypeError, ValueError):
return None
# ---------------------------------------------------------------------------
# File update helpers
# ---------------------------------------------------------------------------
def _add_frontmatter_field(file_path: Path, key: str, value: str) -> None:
"""Insert key: "value" into frontmatter before the closing --- line."""
text = file_path.read_text(encoding="utf-8")
lines = text.split("\n")
close_idx = None
for i, line in enumerate(lines[1:], 1):
if line.strip() == "---":
close_idx = i
break
if close_idx is None:
return
lines.insert(close_idx, f'{key}: "{value}"')
file_path.write_text("\n".join(lines), encoding="utf-8")
def _inject_task_id_into_block(
file_path: Path, field_name: str, field_value: str, match_id: str
) -> bool:
"""Inject state_hub_task_id into the ```task``` block whose id == match_id."""
text = file_path.read_text(encoding="utf-8")
def _replace(m: re.Match) -> str:
block_content = m.group(1)
task_meta = _parse_yaml_block(block_content.strip())
if str(task_meta.get("id", "")) != match_id:
return m.group(0)
existing_val = task_meta.get(field_name)
if existing_val is not None and str(existing_val).strip() not in ("", "~", "null", "None", "none"):
return m.group(0)
# Replace existing null/~ line if present, otherwise append
new_content = re.sub(
rf"^{re.escape(field_name)}:.*$",
f'{field_name}: "{field_value}"',
block_content,
flags=re.MULTILINE,
)
if new_content == block_content:
new_content = block_content.rstrip() + f"\n{field_name}: \"{field_value}\""
return f"```task\n{new_content}\n```"
new_text = _TASK_BLOCK_RE.sub(_replace, text)
if new_text != text:
file_path.write_text(new_text, encoding="utf-8")
return True
return False
def _inject_task_id_frontmatter_list(
file_path: Path, field_value: str, match_id: str
) -> bool:
"""Inject state_hub_task_id into a task entry in frontmatter tasks: list."""
if not _HAS_YAML:
return False
import yaml
text = file_path.read_text(encoding="utf-8")
meta, body = parse_frontmatter(text)
tasks = meta.get("tasks", [])
changed = False
for t in tasks:
if str(t.get("id", "")) == match_id and "state_hub_task_id" not in t:
t["state_hub_task_id"] = field_value
changed = True
if not changed:
return False
try:
new_meta_str = yaml.dump(meta, allow_unicode=True, default_flow_style=False)
parts = text.split("---", 2)
if len(parts) < 3:
return False
new_text = f"---\n{new_meta_str}---{parts[2]}"
file_path.write_text(new_text, encoding="utf-8")
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def _api_get(api_base: str, path: str, params: dict | None = None) -> Any:
if not _HAS_HTTPX:
return None
# Only append trailing slash to the path component, not to query strings
if "?" not in path and not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
filtered = {k: v for k, v in (params or {}).items() if v is not None}
r = c.get(path, params=filtered if filtered else None)
r.raise_for_status()
return r.json()
except Exception:
return None
def _api_patch(api_base: str, path: str, body: dict) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.patch(path, json=body)
r.raise_for_status()
return r.json()
except Exception as exc:
# Return a sentinel dict so callers can distinguish "API error" from "success"
# and report it rather than silently dropping the fix.
return {"_error": str(exc)}
def _api_post(api_base: str, path: str, body: dict) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.post(path, json=body)
r.raise_for_status()
return r.json()
except Exception:
return None
# ---------------------------------------------------------------------------
# Core check engine
# ---------------------------------------------------------------------------
def resolve_repo_path(repo: dict, override: str | None = None) -> str:
"""Resolve the local filesystem path for a repo on the current machine.
Priority:
1. Explicit --repo-path CLI override
2. host_paths[current_hostname] — per-machine path registered via POST /repos/{slug}/paths/
3. local_path — legacy single-path field (backward compat)
"""
if override:
return override
hostname = socket.gethostname()
host_paths = repo.get("host_paths") or {}
return host_paths.get(hostname) or repo.get("local_path") or ""
def _infer_slug_from_path(api_base: str, path: str) -> "tuple[str, str] | None":
"""Identify a registered repo from a local checkout path.
Strategy (in order):
1. Root-commit fingerprint — ``git rev-list --max-parents=0 HEAD`` produces
the same SHA-1 on every clone, independent of remote URL or checkout path.
Looked up via ``GET /repos/by-fingerprint?hash=<sha>``.
2. Remote URL fallback — exact string match against ``remote_url`` field.
Less reliable across machines (SSH aliases, HTTP vs HTTPS, etc.) but
useful when fingerprint is not yet stored.
Returns ``(slug, git_root)`` on success, ``None`` if no match found.
"""
try:
git_root = subprocess.check_output(
["git", "rev-parse", "--show-toplevel"],
cwd=path, stderr=subprocess.DEVNULL, text=True,
).strip()
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
return None
# Strategy 1: fingerprint lookup (most reliable)
try:
fingerprint = subprocess.check_output(
["git", "rev-list", "--max-parents=0", "HEAD"],
cwd=git_root, stderr=subprocess.DEVNULL, text=True,
).strip()
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
fingerprint = ""
# Get local remote URL once — used for both disambiguation and fallback
try:
remote_url = subprocess.check_output(
["git", "remote", "get-url", "origin"],
cwd=git_root, stderr=subprocess.DEVNULL, text=True,
).strip()
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
remote_url = ""
if fingerprint:
# Try fingerprint + remote URL for precise match
if remote_url:
import urllib.parse as _up
candidates = _api_get(
api_base,
f"/repos/by-fingerprint?hash={fingerprint}&remote_url={_up.quote(remote_url, safe='')}",
)
if isinstance(candidates, list) and len(candidates) == 1:
return candidates[0]["slug"], git_root
# Fingerprint alone (works when repos don't share ancestry)
candidates = _api_get(api_base, f"/repos/by-fingerprint?hash={fingerprint}")
if isinstance(candidates, list):
if len(candidates) == 1:
return candidates[0]["slug"], git_root
if len(candidates) > 1:
# Disambiguate: prefer the repo whose slug appears in the git_root path
for repo in candidates:
if repo["slug"] in git_root:
return repo["slug"], git_root
# Can't disambiguate — return first match with a warning
print(
f" WARNING: {len(candidates)} repos share fingerprint {fingerprint[:12]}"
f"— using '{candidates[0]['slug']}'. "
"Set remote_url on each repo for accurate matching.",
file=sys.stderr,
)
return candidates[0]["slug"], git_root
# Strategy 2: remote URL exact match (fallback)
if remote_url:
import urllib.parse as _up
repo = _api_get(api_base, f"/repos/by-remote?url={_up.quote(remote_url, safe='')}")
if repo and isinstance(repo, dict) and "slug" in repo:
return repo["slug"], git_root
return None
def check_repo(api_base: str, repo_slug: str, repo_path_override: str | None = None) -> ConsistencyReport:
"""Run all consistency checks for a registered repo."""
repo = _api_get(api_base, f"/repos/{repo_slug}")
if repo is None:
report = ConsistencyReport(repo_slug=repo_slug, repo_path="(unknown)")
report.add(
severity="FAIL", check_id="C-00",
message=f"Repo '{repo_slug}' not found in state-hub DB",
fixable=False,
)
return report
repo_id: str = repo["id"]
repo_path: str = resolve_repo_path(repo, repo_path_override)
report = ConsistencyReport(repo_slug=repo_slug, repo_path=repo_path)
if not repo_path:
report.add(
severity="FAIL", check_id="C-00",
message=f"Repo '{repo_slug}' has no local_path — cannot check workplan files",
fixable=False,
)
_check_orphan_db(api_base, repo_id, set(), report)
return report
repo_dir = Path(repo_path)
workplans_dir = repo_dir / "workplans"
# C-01: workplans/ directory missing
if not workplans_dir.is_dir():
report.add(
severity="FAIL", check_id="C-01",
message=(
"workplans/ directory missing — ADR-001 requires workplan files "
"at <repo>/workplans/<ID>-<slug>.md"
),
fixable=False,
)
_check_orphan_db(api_base, repo_id, set(), report)
return report
# Parse workplan files
workplan_infos: list[tuple[Path, dict, str]] = []
file_ws_ids: dict[str, tuple[Path, dict, str]] = {} # ws_id → (file, meta, body)
active_file_ws_ids: set[str] = set()
for wp_file in iter_workplan_files(workplans_dir):
try:
text = wp_file.read_text(encoding="utf-8")
except OSError as e:
report.add(severity="FAIL", check_id="C-02",
message=f"Cannot read file: {e}", file_path=workplan_display_path(repo_dir, wp_file))
continue
if not text.startswith("---"):
report.add(severity="FAIL", check_id="C-02",
message="No YAML frontmatter found", file_path=workplan_display_path(repo_dir, wp_file))
continue
meta, body = parse_frontmatter(text)
if not meta or meta.get("_parse_error"):
report.add(severity="FAIL", check_id="C-02",
message="YAML frontmatter parse error", file_path=workplan_display_path(repo_dir, wp_file))
continue
workplan_infos.append((wp_file, meta, body))
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
if ws_id:
file_ws_ids[ws_id] = (wp_file, meta, body)
if wp_file.parent == workplans_dir:
active_file_ws_ids.add(ws_id)
workplan_id_to_ws_id: dict[str, str] = {}
task_file_id_to_sh_id: dict[str, str] = {}
for _wp_file, meta, body in workplan_infos:
mapped_ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
wp_id = str(meta.get("id", "")).strip()
if wp_id and mapped_ws_id:
workplan_id_to_ws_id[wp_id] = mapped_ws_id
for task in get_tasks_from_workplan(meta, body):
if task.get("_parse_error"):
continue
task_file_id = str(task.get("id", "")).strip()
raw_sh = task.get("state_hub_task_id")
task_sh_id = "" if raw_sh is None else str(raw_sh).strip().strip('"')
if task_file_id and task_sh_id and task_sh_id not in ("~", "null", "None", "none"):
task_file_id_to_sh_id[task_file_id] = task_sh_id
# Per-workplan checks
for wp_file, meta, body in workplan_infos:
fname = workplan_display_path(repo_dir, wp_file)
archived_file = wp_file.parent.name == "archived"
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
file_status = str(meta.get("status", "")).strip()
file_title = str(meta.get("title", "")).strip()
file_domain = str(meta.get("domain", "")).strip()
if archived_file and normalise_workstream_status(file_status) == "active":
report.add(
severity="FAIL", check_id="C-18",
message="Archived workplan file has active/todo status",
file_path=fname,
file_value=file_status,
fixable=False,
)
if not ws_id:
# C-06: workplan not linked to any DB workstream
report.add(
severity="WARN", check_id="C-06",
message="Workplan has no state_hub_workstream_id — not indexed in DB",
file_path=fname,
file_value=file_title or fname,
fixable=True,
_fix_context={
"wp_file": str(wp_file),
"meta": meta,
"body": body,
"repo_id": repo_id,
"domain": file_domain,
},
)
continue
ws = _api_get(api_base, f"/workstreams/{ws_id}")
if ws is None:
# C-03: stale workstream reference
report.add(
severity="FAIL", check_id="C-03",
message=f"state_hub_workstream_id {ws_id[:8]}… not found in DB (stale reference)",
file_path=fname,
db_id=ws_id,
fixable=False,
)
continue
# C-09: repo mismatch — file is here but DB says different repo
db_repo_id: str = ws.get("repo_id") or ""
if db_repo_id != repo_id:
report.add(
severity="FAIL", check_id="C-09",
message=(
f"Workstream '{ws.get('slug')}' repo_id in DB is "
f"{db_repo_id[:8] if db_repo_id else 'None'} "
f"but backing file lives in {repo_slug} ({repo_id[:8]}…)"
),
file_path=fname,
db_id=ws_id,
file_value=repo_id,
db_value=db_repo_id,
fixable=True,
_fix_context={"ws_id": ws_id, "correct_repo_id": repo_id},
)
# Continue to check drift even with mismatched repo
# C-04: status drift — normalise file value before comparing so that
# "done" (file) vs "completed" (DB) is not treated as drift.
db_status = ws.get("status", "")
normalised_file_status = normalise_workstream_status(file_status)
if file_status and db_status and normalised_file_status != db_status:
report.add(
severity="WARN", check_id="C-04",
message=(
f"Status drift in '{ws.get('slug')}': "
f"file={file_status!r} db={db_status!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=file_status,
db_value=db_status,
fixable=True,
_fix_context={
"ws_id": ws_id,
"field": "status",
"value": normalised_file_status, # always send DB-valid value
},
)
# C-05: title drift
db_title = ws.get("title", "")
if file_title and db_title and file_title != db_title:
report.add(
severity="WARN", check_id="C-05",
message=(
f"Title drift in '{ws.get('slug')}': "
f"file={file_title!r} db={db_title!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=file_title,
db_value=db_title,
fixable=True,
_fix_context={"ws_id": ws_id, "field": "title", "value": file_title},
)
planning_priority = str(meta.get("planning_priority", "")).strip() or None
if planning_priority != (ws.get("planning_priority") or None):
report.add(
severity="WARN", check_id="C-19",
message=(
f"Planning priority drift in '{ws.get('slug')}': "
f"file={planning_priority!r} db={ws.get('planning_priority')!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=planning_priority,
db_value=ws.get("planning_priority"),
fixable=True,
_fix_context={"ws_id": ws_id, "field": "planning_priority", "value": planning_priority},
)
planning_order = _as_int_or_none(meta.get("planning_order"))
if planning_order != ws.get("planning_order"):
report.add(
severity="WARN", check_id="C-19",
message=(
f"Planning order drift in '{ws.get('slug')}': "
f"file={planning_order!r} db={ws.get('planning_order')!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=planning_order,
db_value=ws.get("planning_order"),
fixable=True,
_fix_context={"ws_id": ws_id, "field": "planning_order", "value": planning_order},
)
# C-10, C-11, C-12: task-level checks
tasks = get_tasks_from_workplan(meta, body)
db_tasks = _api_get(api_base, "/tasks", {"workstream_id": ws_id})
db_task_by_id: dict[str, dict] = {}
if isinstance(db_tasks, list):
for t in db_tasks:
db_task_by_id[t["id"]] = t
existing_deps = _api_get(api_base, f"/workstreams/{ws_id}/dependencies") or []
existing_dep_keys = set()
if isinstance(existing_deps, list):
for dep in existing_deps:
if dep.get("from_workstream_id") != ws_id:
continue
rel = dep.get("relationship_type") or "blocks"
if dep.get("to_workstream_id"):
existing_dep_keys.add(("workstream", dep["to_workstream_id"], rel))
if dep.get("to_task_id"):
existing_dep_keys.add(("task", dep["to_task_id"], rel))
for target_wp_id in _as_list(meta.get("depends_on_workplans")):
target_ws_id = workplan_id_to_ws_id.get(target_wp_id)
if not target_ws_id:
report.add(
severity="WARN",
check_id="C-20",
message=f"Workplan dependency target '{target_wp_id}' is not linked to State Hub",
file_path=fname,
file_value=target_wp_id,
fixable=False,
)
continue
dep_key = ("workstream", target_ws_id, "blocks")
if dep_key not in existing_dep_keys:
report.add(
severity="WARN",
check_id="C-20",
message=f"Missing DB dependency edge: {ws_id[:8]}… depends on workplan {target_wp_id}",
file_path=fname,
db_id=ws_id,
file_value=target_wp_id,
fixable=True,
_fix_context={
"from_workstream_id": ws_id,
"to_workstream_id": target_ws_id,
"relationship_type": "blocks",
},
)
for target_task_id in _as_list(meta.get("depends_on_tasks")):
target_sh_id = task_file_id_to_sh_id.get(target_task_id)
if not target_sh_id:
report.add(
severity="WARN",
check_id="C-20",
message=f"Task dependency target '{target_task_id}' is not linked to State Hub",
file_path=fname,
file_value=target_task_id,
fixable=False,
)
continue
dep_key = ("task", target_sh_id, "starts_after")
if dep_key not in existing_dep_keys:
report.add(
severity="WARN",
check_id="C-20",
message=f"Missing DB dependency edge: {ws_id[:8]}… starts after task {target_task_id}",
file_path=fname,
db_id=ws_id,
file_value=target_task_id,
fixable=True,
_fix_context={
"from_workstream_id": ws_id,
"to_task_id": target_sh_id,
"relationship_type": "starts_after",
},
)
file_task_sh_ids: set[str] = set()
for task in tasks:
if task.get("_parse_error"):
continue
t_id = str(task.get("id", "")).strip()
_raw_sh = task.get("state_hub_task_id")
t_sh_id = "" if _raw_sh is None else str(_raw_sh).strip().strip('"')
if t_sh_id in ("~", "null", "None", "none"):
t_sh_id = ""
t_status = str(task.get("status", "")).strip()
if t_sh_id:
file_task_sh_ids.add(t_sh_id)
db_task = _api_get(api_base, f"/tasks/{t_sh_id}")
if db_task is None:
report.add(
severity="FAIL", check_id="C-03",
message=f"state_hub_task_id {t_sh_id[:8]}… not found in DB",
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
fixable=False,
)
continue
# C-10 / C-15: task status drift
db_t_status = db_task.get("status", "")
if t_status and db_t_status and t_status != db_t_status:
db_rank = STATUS_ORDER.get(db_t_status, 0)
file_rank = STATUS_ORDER.get(t_status, 0)
if db_rank >= file_rank:
# C-15: DB is already at the same rank or ahead — prevent
# regression. Writeback syncs the file to match DB.
report.add(
severity="WARN", check_id="C-15",
message=(
f"DB task '{t_id}' is ahead of file "
f"(db={db_t_status!r}, file={t_status!r}) "
f"— regression prevented; writeback will sync file"
),
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
file_value=t_status,
db_value=db_t_status,
fixable=True,
_fix_context={
"task_id": t_sh_id,
"wp_file": str(wp_file),
"task_block_id": t_id,
"db_status": db_t_status,
},
)
else:
# C-10: file is ahead — apply file→DB sync (normal drift)
report.add(
severity="WARN", check_id="C-10",
message=(
f"Task status drift '{t_id}': "
f"file={t_status!r} db={db_t_status!r} (file wins)"
),
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
file_value=t_status,
db_value=db_t_status,
fixable=True,
_fix_context={"task_id": t_sh_id, "status": t_status},
)
elif t_id:
# C-11: task exists in file but not linked to DB
ws_status = ws.get("status", "")
report.add(
severity="WARN", check_id="C-11",
message=f"Task '{t_id}' has no state_hub_task_id",
file_path=f"{fname}#{t_id}",
fixable=True,
_fix_context={
"ws_id": ws_id,
"ws_status": ws_status,
"task": task,
"wp_file": str(wp_file),
"meta": meta,
"body": body,
},
)
# C-12: DB tasks with no file backing
if isinstance(db_tasks, list):
ws_status = ws.get("status", "")
ws_finished = ws_status in ("completed", "archived")
for db_t in db_tasks:
if db_t["id"] not in file_task_sh_ids:
db_t_status = db_t.get("status", "")
open_task = db_t_status not in ("done", "cancelled")
# Auto-cancel fixable when workstream is finished and task is open
fixable = ws_finished and open_task
report.add(
severity="WARN", check_id="C-12",
message=(
f"DB task '{db_t.get('title', '')}' "
f"(id={db_t['id'][:8]}…, status={db_t_status}) "
f"in workstream '{ws.get('slug')}' has no file backing"
),
db_id=db_t["id"],
fixable=fixable,
_fix_context={
"task_id": db_t["id"],
"ws_finished": ws_finished,
},
)
# C-13: all DB tasks done but workstream still active — worker forgot to close
db_status = ws.get("status", "")
if db_status == "active" and isinstance(db_tasks, list) and db_tasks:
non_terminal = [
t for t in db_tasks
if t.get("status") not in ("done", "cancelled")
]
if not non_terminal:
report.add(
severity="WARN", check_id="C-13",
message=(
f"All {len(db_tasks)} DB tasks for '{ws.get('slug')}' are "
f"done/cancelled but workstream status is still 'active'"
f"worker likely forgot update_workstream_status()"
),
file_path=fname,
db_id=ws_id,
db_value="active",
fixable=True,
_fix_context={
"ws_id": ws_id,
"field": "status",
"value": "completed",
},
)
# C-07 / C-08: orphan DB workstreams (have repo_id=this_repo but no backing file)
_check_orphan_db(api_base, repo_id, set(file_ws_ids.keys()), report, active_file_ws_ids)
# C-14: ghost duplicate — active workstream on same topic with no repo_id whose
# title matches a file-backed workstream. Root cause: create_workstream() called
# before the workplan file was written; fix-consistency then created a second
# workstream from the file, leaving the first as an invisible orphan.
_check_ghost_duplicates(api_base, workplan_infos, file_ws_ids, report)
return report
def _check_orphan_db(
api_base: str,
repo_id: str,
file_ws_ids: set[str],
report: ConsistencyReport,
active_file_ws_ids: set[str] | None = None,
) -> None:
"""Flag DB workstreams with repo_id=this_repo that have no backing workplan file."""
active_file_ws_ids = active_file_ws_ids or file_ws_ids
all_ws = _api_get(api_base, "/workstreams", {"repo_id": repo_id})
if not isinstance(all_ws, list):
return
for ws in all_ws:
ws_id = ws["id"]
ws_status = ws.get("status", "")
if ws_status == "active" and ws_id in active_file_ws_ids:
continue
if ws_status in ("completed", "archived") and ws_id in file_ws_ids:
continue
ws_slug = ws.get("slug", "")
if ws_status == "active":
report.add(
severity="FAIL", check_id="C-07",
message=(
f"Active DB workstream '{ws_slug}' (id={ws_id[:8]}…) "
f"has no backing workplan file — ADR-001 violation"
),
db_id=ws_id,
fixable=False,
)
elif ws_status in ("completed", "archived"):
report.add(
severity="INFO", check_id="C-08",
message=(
f"Completed/archived DB workstream '{ws_slug}' "
f"(id={ws_id[:8]}…, status={ws_status}) has no backing workplan file"
),
db_id=ws_id,
fixable=False,
)
def _check_ghost_duplicates(
api_base: str,
workplan_infos: list[tuple],
file_ws_ids: dict[str, tuple],
report: ConsistencyReport,
) -> None:
"""C-14: detect active workstreams with repo_id=null whose title matches a
file-backed workstream — these are ghosts created by premature create_workstream()
calls before the workplan file existed.
"""
# Build lookup: normalised title → file-backed workstream id
file_titles: dict[str, str] = {}
for _, meta, _ in workplan_infos:
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
title = str(meta.get("title", "")).strip().lower()
if title and ws_id:
file_titles[title] = ws_id
if not file_titles:
return
# Gather topic_ids from all file-backed workstreams so we can query by topic
topic_ids: set[str] = set()
for ws_id in file_ws_ids:
ws = _api_get(api_base, f"/workstreams/{ws_id}")
if ws and ws.get("topic_id"):
topic_ids.add(ws["topic_id"])
for topic_id in topic_ids:
topic_ws = _api_get(api_base, "/workstreams", {"topic_id": topic_id, "status": "active"})
if not isinstance(topic_ws, list):
continue
for ws in topic_ws:
ws_id = ws["id"]
if ws_id in file_ws_ids:
continue # legitimately linked
if ws.get("repo_id"):
continue # C-07 covers repo-scoped orphans
ws_title = ws.get("title", "").strip().lower()
if ws_title in file_titles:
file_backed_id = file_titles[ws_title]
report.add(
severity="WARN",
check_id="C-14",
message=(
f"Ghost duplicate: active workstream '{ws.get('slug')}' "
f"(id={ws_id[:8]}…, repo_id=null) has same title as "
f"file-backed workstream {file_backed_id[:8]}… — "
f"likely created via create_workstream() before workplan file existed; "
f"archive it"
),
db_id=ws_id,
fixable=False,
)
# ---------------------------------------------------------------------------
# Git sync (T02T04: pull gate, writeback, push seal) — see repo_sync.py
# ---------------------------------------------------------------------------
# repo_sync.py owns the push-seal invariant and all git lifecycle primitives.
# The aliases below keep internal call sites and existing tests unchanged.
from repo_sync import ( # noqa: E402 (import after top-level imports intentional)
count_local_ahead as _detect_ahead_of_remote,
count_remote_ahead as _count_remote_ahead,
pull_ff as _git_pull,
push_ff as _git_push,
)
def _detect_behind_remote(repo_path: str) -> bool:
"""True if remote has commits the local repo lacks (C-16 predicate).
Delegates to repo_sync.count_remote_ahead, which fetches before counting.
Returns False on any error so C-16 is never spuriously triggered.
"""
return _count_remote_ahead(repo_path) > 0
def _patch_task_status_in_file(
file_path: Path, task_block_id: str, new_status: str
) -> bool:
"""Update the ``status:`` field inside a task block identified by its id.
Only modifies lines inside a ```task … ``` fenced block whose ``id:``
matches *task_block_id*. Returns True if the file was changed.
"""
text = file_path.read_text(encoding="utf-8")
def _replace(m: re.Match) -> str:
# m.group(0) is the full ```task...``` block including fences.
# m.group(1) is the inner YAML content (no fences).
full_block = m.group(0)
inner = m.group(1).strip()
task_meta = _parse_yaml_block(inner)
if str(task_meta.get("id", "")).strip() != task_block_id:
return full_block
# Replace the status line only, leave everything else untouched.
return re.sub(
r"^(status:\s*)\S+",
rf"\g<1>{new_status}",
full_block,
flags=re.MULTILINE,
)
new_text = _TASK_BLOCK_RE.sub(_replace, text)
if new_text != text:
file_path.write_text(new_text, encoding="utf-8")
return True
return False
def _git_commit_writeback(
repo_path: str, file_path: Path, changes: list[str]
) -> bool:
"""Stage *file_path* and commit with a standard writeback message.
Returns True on success, False on any error (errors are logged to stderr
but do not abort the consistency run).
"""
from datetime import date as _date
summary = "\n".join(f" - {c}" for c in changes)
msg = (
f"chore(consistency): sync task status from DB [auto]\n\n"
f"Updated by fix-consistency on {_date.today().isoformat()}:\n"
f"{summary}"
)
import os as _os
# Pass GIT_CUSTODIAN_SYNC=1 so the post-commit hook can detect it is
# running from within a sync pass and exit early, preventing re-entrancy.
sync_env = {**_os.environ, "GIT_CUSTODIAN_SYNC": "1"}
try:
subprocess.run(
["git", "-C", repo_path, "add", str(file_path)],
check=True, capture_output=True, env=sync_env,
)
subprocess.run(
["git", "-C", repo_path, "commit", "-m", msg],
check=True, capture_output=True, env=sync_env,
)
return True
except subprocess.CalledProcessError as e:
print(
f"WARN: git commit failed for writeback: {e.stderr.decode().strip()}",
file=sys.stderr,
)
return False
# ---------------------------------------------------------------------------
# Worker orientation brief (.custodian-brief.md)
# ---------------------------------------------------------------------------
_BRIEF_HEADER = "<!-- custodian-brief: generated by fix-consistency — do not edit manually -->"
_TASK_STATUS_ICON = {"done": "", "cancelled": "", "in_progress": "", "blocked": "!", "todo": "·"}
_OPEN_STATUSES = {"todo", "in_progress", "blocked"}
def _write_custodian_brief(api_base: str, repo_slug: str, repo_path: str) -> bool:
"""Generate .custodian-brief.md at the repo root and git-commit if changed.
The brief gives any agent — including subagents without MCP access and
workers on remote machines — instant orientation without a live hub
connection. Returns True if the file was written (content changed).
"""
import datetime as _dt
from datetime import timezone as _tz
repo = _api_get(api_base, f"/repos/{repo_slug}")
if not repo:
return False
repo_id: str = repo.get("id", "")
domain_slug: str = ""
# Resolve domain slug: prefer active workstreams, fall back to any workstream
# so that a fully-completed repo doesn't degrade to "(unknown)".
workstreams = _api_get(api_base, "/workstreams", {"repo_id": repo_id, "status": "active"}) or []
_ws_for_domain = workstreams if (isinstance(workstreams, list) and workstreams) else []
if not _ws_for_domain:
all_ws = _api_get(api_base, "/workstreams", {"repo_id": repo_id}) or []
_ws_for_domain = all_ws if isinstance(all_ws, list) else []
if _ws_for_domain:
topic = _api_get(api_base, f"/topics/{_ws_for_domain[0].get('topic_id', '')}")
if topic:
domain_slug = topic.get("domain_slug", "")
# Active repo goal (first active one if multiple)
goal_text = ""
goals = _api_get(api_base, "/repo-goals", {"repo_slug": repo_slug}) or []
if isinstance(goals, list):
active_goals = [g for g in goals if g.get("status") == "active"]
if active_goals:
g = active_goals[0]
goal_text = g.get("title", "") or g.get("description", "")
now_utc = _dt.datetime.now(_tz.utc)
ts = now_utc.strftime("%Y-%m-%d %H:%M UTC")
lines = [
_BRIEF_HEADER,
f"# Custodian Brief — {repo_slug}",
"",
f"**Domain:** {domain_slug or '(unknown)'} ",
f"**Last synced:** {ts} ",
"**State Hub:** http://127.0.0.1:8000 *(adjust if running on a remote machine)*",
"",
]
if goal_text:
lines += ["## Current Goal", "", goal_text, ""]
if isinstance(workstreams, list) and workstreams:
lines.append("## Active Workstreams")
for ws in workstreams:
ws_title = ws.get("title", ws.get("slug", "?"))
ws_id = ws["id"]
tasks = _api_get(api_base, "/tasks", {"workstream_id": ws_id}) or []
if not isinstance(tasks, list):
tasks = []
done = sum(1 for t in tasks if t.get("status") in ("done", "cancelled"))
total = len(tasks)
pct = f"{done}/{total}" if total else "no tasks"
open_tasks = [t for t in tasks if t.get("status") in _OPEN_STATUSES]
# Show blocked first, then in_progress, then todo (cap at 5)
priority_order = {"blocked": 0, "in_progress": 1, "todo": 2}
open_tasks.sort(key=lambda t: priority_order.get(t.get("status", "todo"), 9))
lines += [
"",
f"### {ws_title}",
f"Progress: {pct} done | workstream_id: `{ws_id}`",
]
if open_tasks:
lines.append("")
lines.append("**Open tasks:**")
for t in open_tasks[:7]:
icon = _TASK_STATUS_ICON.get(t.get("status", "todo"), "·")
title = t.get("title", t["id"])
tid = t["id"]
status = t.get("status", "")
blocker = t.get("blocking_reason", "")
task_line = f"- {icon} {title} `{tid[:8]}`"
if status == "blocked" and blocker:
task_line += f"\n *(blocked: {blocker})*"
lines.append(task_line)
if len(open_tasks) > 7:
lines.append(f"- … and {len(open_tasks) - 7} more open tasks")
else:
lines += ["## Active Workstreams", "", "*(none — repo may need first-session setup)*"]
lines += [
"",
"---",
"## MCP Orientation (when available)",
"",
"If the state-hub MCP server is reachable, call:",
f"`get_domain_summary(\"{domain_slug}\")`",
"This provides richer cross-domain context.",
"If the MCP call fails, use this file as your orientation source.",
]
content = "\n".join(lines) + "\n"
brief_path = Path(repo_path) / ".custodian-brief.md"
existing = brief_path.read_text(encoding="utf-8") if brief_path.exists() else ""
# Strip the timestamp line before comparing to avoid spurious writes
def _strip_ts(text: str) -> str:
return "\n".join(
ln for ln in text.splitlines()
if not ln.startswith("**Last synced:**")
)
if _strip_ts(content) == _strip_ts(existing):
return False # no meaningful change
brief_path.write_text(content, encoding="utf-8")
# Commit the brief so remote workers can pull it
_git_commit_writeback(
repo_path,
brief_path,
[f"update .custodian-brief.md for {repo_slug}"],
)
return True
# ---------------------------------------------------------------------------
# Fix engine
# ---------------------------------------------------------------------------
def fix_repo(
api_base: str,
repo_slug: str,
repo_path_override: str | None = None,
no_writeback: bool = False,
) -> ConsistencyReport:
"""Run checks then apply all auto-fixable issues. Returns updated report."""
report = check_repo(api_base, repo_slug, repo_path_override)
# Auto-register this machine's path in host_paths so future runs work
# without --repo-path. Idempotent: skipped when already correct.
repo_path = report.repo_path
if repo_path:
repo_record = _api_get(api_base, f"/repos/{repo_slug}")
if repo_record:
hostname = socket.gethostname()
if (repo_record.get("host_paths") or {}).get(hostname) != repo_path:
result = _api_post(
api_base, f"/repos/{repo_slug}/paths",
{"host": hostname, "path": repo_path},
)
if result and "_error" not in result:
report.fixes_applied.append(
f"host_paths[{hostname}] → {repo_path}"
)
# T02 — pull gate: warn and skip all write operations when local repo is
# behind its remote tracking branch.
if repo_path and _detect_behind_remote(repo_path):
report.add(
severity="WARN", check_id="C-16",
message=(
f"Repo '{repo_slug}' is behind its remote tracking branch — "
f"pull before fixing to avoid clobbering remote progress. "
f"Run: git -C {repo_path} pull --ff-only"
),
fixable=False,
)
report.fixes_applied.append(
"C-16: all write operations skipped — local repo is behind remote"
)
return report
# C-17: backlog guard — if local has unpushed commits from a prior failed push,
# try to push them before making more. Skipping writes prevents runaway divergence.
if repo_path:
ahead = _detect_ahead_of_remote(repo_path)
if ahead > 0:
push_ok, push_msg = _git_push(repo_path)
if not push_ok:
report.add(
severity="WARN", check_id="C-17",
message=(
f"Repo '{repo_slug}' has {ahead} unpushed commit(s) and push "
f"failed ({push_msg}) — skipping writes to prevent runaway divergence"
),
fixable=False,
)
report.fixes_applied.append(
"C-17: all write operations skipped — unpushed commits, push failed"
)
return report
# Push succeeded — local is now in sync; proceed normally
report.fixes_applied.append(f"C-17 cleared: pushed {ahead} backlogged commit(s)")
fixable = [i for i in report.issues if i.fixable]
for issue in fixable:
ctx = issue._fix_context
try:
if issue.check_id in ("C-04", "C-05", "C-13", "C-19"):
ws_id = ctx["ws_id"]
result = _api_patch(api_base, f"/workstreams/{ws_id}",
{ctx["field"]: ctx["value"]})
if result is not None and "_error" not in result:
report.fixes_applied.append(
f"{issue.check_id} fixed: workstream {ws_id[:8]}"
f"{ctx['field']}{ctx['value']!r}"
)
elif result is not None:
report.fixes_applied.append(
f"{issue.check_id} FAILED: workstream {ws_id[:8]}"
f"{ctx['field']}{ctx['value']!r}: {result['_error']}"
)
elif issue.check_id == "C-06":
wp_file = Path(ctx["wp_file"])
meta = ctx["meta"]
domain = ctx["domain"]
repo_id_val = ctx["repo_id"]
body = ctx.get("body", "")
wp_id = str(meta.get("id", "")).strip()
title = str(meta.get("title", "")).strip()
status = str(meta.get("status", "active")).strip()
if status not in ("active", "completed", "archived"):
status = "active"
# Find topic_id for this domain
topics = _api_get(api_base, "/topics")
topic_id = None
if isinstance(topics, list):
for t in topics:
if t.get("domain_slug") == domain:
topic_id = t["id"]
break
if topic_id is None:
report.fixes_applied.append(
f"C-06 SKIP {wp_id}: no topic found for domain '{domain}'"
)
continue
slug = re.sub(r"[^a-z0-9-]", "-", wp_id.lower()).strip("-")
ws_data = _api_post(api_base, "/workstreams", {
"topic_id": topic_id,
"repo_id": repo_id_val,
"slug": slug,
"title": title or wp_id,
"status": status,
"owner": str(meta.get("owner", "")).strip() or None,
"planning_priority": str(meta.get("planning_priority", "")).strip() or None,
"planning_order": _as_int_or_none(meta.get("planning_order")),
})
if ws_data is None:
report.fixes_applied.append(
f"C-06 FAIL {wp_id}: could not create workstream in DB"
)
continue
new_ws_id = ws_data["id"]
_add_frontmatter_field(wp_file, "state_hub_workstream_id", new_ws_id)
report.fixes_applied.append(
f"C-06 fixed: created workstream {new_ws_id[:8]}"
f"for {wp_id}, wrote ID to {wp_file.name}"
)
# Create tasks and inject IDs
tasks = get_tasks_from_workplan(meta, body)
for task in tasks:
if task.get("_parse_error"):
continue
t_id = str(task.get("id", "")).strip()
if not t_id:
continue
t_status = str(task.get("status", "todo")).strip()
if t_status not in VALID_TASK_STATUSES:
t_status = "todo"
t_priority = str(task.get("priority", "medium")).strip()
if t_priority not in VALID_TASK_PRIORITIES:
t_priority = "medium"
t_data = _api_post(api_base, "/tasks", {
"workstream_id": new_ws_id,
"title": str(task.get("title", t_id)).strip() or t_id,
"status": t_status,
"priority": t_priority,
"assignee": task.get("assignee") or None,
})
if t_data:
t_db_id = t_data["id"]
injected = _inject_task_id_into_block(
wp_file, "state_hub_task_id", t_db_id, t_id
)
if not injected:
_inject_task_id_frontmatter_list(wp_file, t_db_id, t_id)
report.fixes_applied.append(f" + task {t_id}{t_db_id[:8]}")
elif issue.check_id == "C-09":
ws_id = ctx["ws_id"]
correct_repo_id = ctx["correct_repo_id"]
result = _api_patch(api_base, f"/workstreams/{ws_id}",
{"repo_id": correct_repo_id})
if result is not None:
report.fixes_applied.append(
f"C-09 fixed: workstream {ws_id[:8]}"
f"repo_id → {correct_repo_id[:8]}"
)
elif issue.check_id == "C-20":
from_workstream_id = ctx["from_workstream_id"]
body = {
"to_workstream_id": ctx.get("to_workstream_id"),
"to_task_id": ctx.get("to_task_id"),
"relationship_type": ctx["relationship_type"],
}
result = _api_post(api_base, f"/workstreams/{from_workstream_id}/dependencies", body)
if result is not None and "_error" not in result:
target = ctx.get("to_workstream_id") or ctx.get("to_task_id")
report.fixes_applied.append(
f"C-20 fixed: dependency {from_workstream_id[:8]}"
f"{ctx['relationship_type']}{target[:8]}"
)
elif result is not None:
report.fixes_applied.append(
f"C-20 FAILED: {result['_error']}"
)
elif issue.check_id == "C-10":
task_id = ctx["task_id"]
status = ctx["status"]
result = _api_patch(api_base, f"/tasks/{task_id}",
{"status": status})
if result is not None:
report.fixes_applied.append(
f"C-10 fixed: task {task_id[:8]}… status → {status!r}"
)
elif issue.check_id == "C-11":
ws_id = ctx["ws_id"]
ws_status = ctx.get("ws_status", "")
task = ctx["task"]
wp_file = Path(ctx["wp_file"])
meta = ctx["meta"]
body = ctx.get("body", "")
t_id = str(task.get("id", "")).strip()
# Skip creating tasks for finished workstreams — the workstream is
# done/archived so unlinked tasks are stale file artefacts, not gaps.
if ws_status in ("completed", "archived"):
report.fixes_applied.append(
f"C-11 skipped: task '{t_id}' in {ws_status} workstream — not created"
)
else:
t_status = str(task.get("status", "todo")).strip()
if t_status not in VALID_TASK_STATUSES:
t_status = "todo"
t_priority = str(task.get("priority", "medium")).strip()
if t_priority not in VALID_TASK_PRIORITIES:
t_priority = "medium"
t_data = _api_post(api_base, "/tasks", {
"workstream_id": ws_id,
"title": str(task.get("title", t_id)).strip() or t_id,
"status": t_status,
"priority": t_priority,
"assignee": task.get("assignee") or None,
})
if t_data:
t_db_id = t_data["id"]
injected = _inject_task_id_into_block(
wp_file, "state_hub_task_id", t_db_id, t_id
)
if not injected:
_inject_task_id_frontmatter_list(wp_file, t_db_id, t_id)
report.fixes_applied.append(
f"C-11 fixed: task '{t_id}'{t_db_id[:8]}"
)
elif issue.check_id == "C-12":
task_id = ctx["task_id"]
if ctx.get("ws_finished"):
result = _api_patch(api_base, f"/tasks/{task_id}", {"status": "cancelled"})
if result is not None:
report.fixes_applied.append(
f"C-12 fixed: orphan task {task_id[:8]}… cancelled (workstream finished)"
)
elif issue.check_id == "C-15":
# T03 — writeback: DB is ahead of file — patch file to match DB.
if no_writeback:
report.fixes_applied.append(
f"C-15 skipped (--no-writeback): task '{ctx['task_block_id']}' "
f"db={ctx['db_status']!r}"
)
else:
wp_file = Path(ctx["wp_file"])
task_block_id = ctx["task_block_id"]
db_status = ctx["db_status"]
old_status = issue.file_value
if _patch_task_status_in_file(wp_file, task_block_id, db_status):
committed = _git_commit_writeback(
repo_path,
wp_file,
[f"{task_block_id}: {old_status}{db_status}"],
)
suffix = " (committed)" if committed else " (file patched, commit failed)"
report.fixes_applied.append(
f"C-15 fixed: task '{task_block_id}' "
f"{old_status}{db_status}{suffix}"
)
else:
report.fixes_applied.append(
f"C-15 SKIP: could not locate task block '{task_block_id}' "
f"in {wp_file.name}"
)
except Exception as e:
report.fixes_applied.append(f"{issue.check_id} ERROR: {e}")
# Record that a sync run happened for this repo
from datetime import timezone as _tz
import datetime as _dt
now_iso = _dt.datetime.now(_tz.utc).isoformat()
_api_patch(api_base, f"/repos/{repo_slug}", {"last_state_synced_at": now_iso})
# Write the worker orientation brief (.custodian-brief.md)
if repo_path:
brief_written = _write_custodian_brief(api_base, repo_slug, repo_path)
if brief_written:
report.fixes_applied.append("brief: .custodian-brief.md updated")
# Push all commits made this run (writebacks + brief) to close the loop.
# This ensures the next run starts with local == remote, making the
# timer idempotent and preventing commit pile-up.
if repo_path:
push_ok, push_msg = _git_push(repo_path)
if push_ok:
report.fixes_applied.append(f"push: {push_msg}")
else:
report.fixes_applied.append(f"push WARN: {push_msg}")
return report
# Check IDs that are known-background noise in multi-machine setups:
# C-08 = completed/archived DB workstream with no file (pre-ADR-001 legacy)
# These alone do not warrant a pull+fix cycle.
_BACKGROUND_CHECKS: frozenset[str] = frozenset({"C-08"})
def _report_needs_action(
report: ConsistencyReport, behind_remote: bool, ahead_of_remote: int = 0
) -> bool:
"""Return True if the repo warrants a pull+fix cycle.
A repo is considered clean (no action needed) when:
- It is not behind its remote tracking branch, AND
- It has no unpushed local commits (from a prior failed push), AND
- It has no FAIL issues, AND
- Every WARN/INFO issue is in the background-noise set (C-08).
"""
if behind_remote or ahead_of_remote > 0:
return True
if report.failures:
return True
actionable_warns = [
i for i in report.warnings + report.infos
if i.check_id not in _BACKGROUND_CHECKS
]
return bool(actionable_warns)
def fix_all_remote(
api_base: str,
no_writeback: bool = False,
max_seconds: int = DEFAULT_REMOTE_ALL_MAX_SECONDS,
) -> list[ConsistencyReport]:
"""Pull-then-fix all registered repos that need attention.
For each repo with a resolvable local path on this machine:
1. Skip if the path does not exist (repo lives on another machine).
2. Run check_repo() and detect_behind_remote() — read-only.
3. If the repo is clean (no actionable issues, not behind remote): skip.
4. Otherwise: git pull --ff-only, then fix_repo().
Returns one ConsistencyReport per repo that was *not* skipped as clean.
Repos skipped as clean are printed to stdout but not included in the return value.
"""
repos = _api_get(api_base, "/repos")
if not isinstance(repos, list):
print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr)
return []
started = time.monotonic()
reports: list[ConsistencyReport] = []
skipped_clean: list[str] = []
skipped_missing: list[str] = []
skipped_budget: list[str] = []
for repo in repos:
slug = repo["slug"]
if max_seconds > 0 and time.monotonic() - started > max_seconds:
skipped_budget.append(slug)
skipped_budget.extend(r.get("slug", "?") for r in repos[repos.index(repo) + 1:])
break
# Resolve path using the same priority as check_repo
path = resolve_repo_path(repo)
if not path or not Path(path).is_dir():
skipped_missing.append(slug)
continue
# Read-only pass: detect issues and remote staleness.
# _detect_behind_remote does a fetch, so _detect_ahead_of_remote
# after it sees an up-to-date tracking branch.
pre_report = check_repo(api_base, slug)
behind = _detect_behind_remote(path)
ahead = _detect_ahead_of_remote(path)
if not _report_needs_action(pre_report, behind, ahead):
skipped_clean.append(slug)
continue
# Pull before fixing
pull_ok, pull_msg = _git_pull(path)
pull_tag = f"pull: {pull_msg}"
if not pull_ok:
pull_tag = f"pull WARN: {pull_msg} — proceeding anyway"
report = fix_repo(api_base, slug, no_writeback=no_writeback)
report.fixes_applied.insert(0, pull_tag)
reports.append(report)
# Print clean/missing summary lines before the detailed reports
if skipped_clean:
print(f" CLEAN (skipped): {', '.join(skipped_clean)}")
if skipped_missing:
print(f" NOT ON THIS HOST (skipped): {', '.join(skipped_missing)}")
if skipped_budget:
print(
f" BUDGET EXHAUSTED after {max_seconds}s (skipped): "
f"{', '.join(skipped_budget)}"
)
if skipped_clean or skipped_missing or skipped_budget:
print()
return reports
def archive_closed_workplans(
repo_path: str,
completion_date: str | None = None,
workplan: str | None = None,
) -> list[str]:
"""Move closed root workplans into workplans/archived/ with YYMMDD prefix.
Only root-level files whose frontmatter status normalises to completed or
archived are moved. Files with any open task blocks are left in place.
"""
repo_dir = Path(repo_path)
workplans_dir = repo_dir / "workplans"
archived_dir = workplans_dir / "archived"
if not workplans_dir.is_dir():
return []
date_prefix = completion_date or datetime.now().strftime("%y%m%d")
archived_dir.mkdir(exist_ok=True)
moved: list[str] = []
for wp_file in sorted(workplans_dir.glob("*.md")):
text = wp_file.read_text(encoding="utf-8")
if not text.startswith("---"):
continue
meta, body = parse_frontmatter(text)
if not meta or meta.get("_parse_error"):
continue
if workplan:
wanted = workplan.removesuffix(".md")
if wanted not in {str(meta.get("id", "")), wp_file.stem, wp_file.name}:
continue
status = normalise_workstream_status(str(meta.get("status", "")).strip())
if status not in ("completed", "archived"):
continue
tasks = get_tasks_from_workplan(meta, body)
open_tasks = [
t for t in tasks
if str(t.get("status", "")).strip() not in ("done", "cancelled")
]
if open_tasks:
continue
target = archived_dir / f"{date_prefix}-{canonical_workplan_filename(wp_file)}"
if target.exists():
raise FileExistsError(f"Archived workplan already exists: {target}")
wp_file.rename(target)
moved.append(f"{wp_file.relative_to(repo_dir)} -> {target.relative_to(repo_dir)}")
return moved
# ---------------------------------------------------------------------------
# Output / rendering
# ---------------------------------------------------------------------------
def render_text(report: ConsistencyReport, show_info: bool = True) -> str:
SEP = "=" * 66
lines = [
f"ADR-001 Consistency Report",
f"Repo: {report.repo_slug}",
f"Path: {report.repo_path}",
SEP,
]
for sev in ("FAIL", "WARN", "INFO"):
section = [i for i in report.issues if i.severity == sev]
if not section or (sev == "INFO" and not show_info):
continue
lines.append(f"\n {sev}S ({len(section)}):")
for i in section:
loc = f" [{i.file_path}]" if i.file_path else ""
fix_tag = " [fixable]" if i.fixable else ""
lines.append(f" {i.check_id}{loc}{fix_tag}")
lines.append(f" {i.message}")
if i.file_value or i.db_value:
lines.append(f" file={i.file_value!r} db={i.db_value!r}")
if report.fixes_applied:
lines.append(f"\n FIXES APPLIED ({len(report.fixes_applied)}):")
for f in report.fixes_applied:
lines.append(f" {f}")
lines.append(f"\n{SEP}")
n_fail = len(report.failures)
n_warn = len(report.warnings)
n_info = len(report.infos)
lines.append(f" {n_fail} fail | {n_warn} warn | {n_info} info")
if n_fail:
lines.append(" RESULT: ✗ FAIL")
elif n_warn:
lines.append(" RESULT: ✓ PASS (with warnings)")
else:
lines.append(" RESULT: ✓ PASS")
lines.append(SEP)
return "\n".join(lines)
def report_to_dict(report: ConsistencyReport) -> dict:
return {
"repo_slug": report.repo_slug,
"repo_path": report.repo_path,
"issues": [
{
"severity": i.severity,
"check_id": i.check_id,
"message": i.message,
"file_path": i.file_path,
"db_id": i.db_id,
"file_value": i.file_value,
"db_value": i.db_value,
"fixable": i.fixable,
}
for i in report.issues
],
"fixes_applied": report.fixes_applied,
"summary": {
"fail": len(report.failures),
"warn": len(report.warnings),
"info": len(report.infos),
},
"result": (
"fail" if report.failures else
"warn" if report.warnings else
"pass"
),
}
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="ADR-001 consistency checker — bidirectional file↔DB validation",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--repo", metavar="SLUG",
help="Registered repo slug (e.g. the-custodian)")
group.add_argument("--all", action="store_true",
help="Run checks against all repos with a resolvable path on this host")
group.add_argument("--here", metavar="PATH", nargs="?", const="",
help="Infer repo slug from git remote URL at PATH (default: CWD)")
parser.add_argument("--fix", action="store_true",
help="Apply auto-fixable issues (status drift, repo mismatch, etc.)")
parser.add_argument("--remote", action="store_true",
help="Pull each repo before fixing; when used with --all, skips repos "
"that are already clean (no actionable issues, not behind remote). "
"Implies --fix.")
parser.add_argument("--max-seconds", type=int, default=DEFAULT_REMOTE_ALL_MAX_SECONDS,
help="Wall-clock budget for --remote --all before remaining repos are skipped "
f"(default: {DEFAULT_REMOTE_ALL_MAX_SECONDS}; 0 disables)")
parser.add_argument("--no-writeback", action="store_true", dest="no_writeback",
help="Disable DB→file status writeback (C-15) while keeping other fixes")
parser.add_argument("--archive-closed", action="store_true",
help="Move closed root workplans to workplans/archived/YYMMDD-*.md")
parser.add_argument("--archive-workplan", metavar="ID_OR_FILE", default=None,
help="When archiving, only move the matching workplan id or filename")
parser.add_argument("--archive-date", metavar="YYMMDD", default=None,
help="Completion date prefix for --archive-closed (default: today)")
parser.add_argument("--repo-path", metavar="PATH", default=None,
help="Override the local repo path (useful when the DB has a different "
"machine's path). Takes priority over host_paths and local_path.")
parser.add_argument("--api-base", default="http://127.0.0.1:8000",
help="State Hub API base URL")
parser.add_argument("--json", action="store_true", dest="as_json",
help="Output JSON instead of human-readable text")
args = parser.parse_args()
import os as _os
no_wb = getattr(args, "no_writeback", False)
do_fix = args.fix or args.remote
# --here: infer slug from git remote URL, then run as single-repo check/fix
if args.here is not None:
search_path = args.here or _os.getcwd()
inferred = _infer_slug_from_path(args.api_base, search_path)
if inferred is None:
print(
f"ERROR: No registered repo with a matching remote_url found at '{search_path}'.",
file=sys.stderr,
)
print(
" Register the repo first: POST /repos/ with remote_url set.",
file=sys.stderr,
)
sys.exit(1)
inferred_slug, git_root = inferred
print(f" Detected: {inferred_slug} ({git_root})")
if do_fix:
reports = [fix_repo(args.api_base, inferred_slug, git_root, no_writeback=no_wb)]
else:
reports = [check_repo(args.api_base, inferred_slug, git_root)]
if args.archive_closed:
moved = archive_closed_workplans(git_root, args.archive_date, args.archive_workplan)
reports[0].fixes_applied.extend(f"archive: {m}" for m in moved)
# --remote --all: smart pull+fix across all repos
elif args.remote and args.all:
with run_lock("consistency-remote-all") as acquired:
if not acquired:
print(
"SKIP: another fix-consistency-remote --all run is already active",
file=sys.stderr,
)
sys.exit(0)
reports = fix_all_remote(
args.api_base,
no_writeback=no_wb,
max_seconds=args.max_seconds,
)
if not reports:
sys.exit(0)
else:
# Resolve repo list
hostname = socket.gethostname()
repo_slugs: list[str] = []
if args.all:
repos = _api_get(args.api_base, "/repos")
if not isinstance(repos, list):
print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr)
sys.exit(1)
repo_slugs = [
r["slug"] for r in repos
if r.get("local_path") or (r.get("host_paths") or {}).get(hostname)
]
if not repo_slugs:
print(
f"No repos with a path registered for host '{hostname}'.",
file=sys.stderr,
)
sys.exit(0)
else:
repo_slugs = [args.repo]
# --repo-path only applies to single-repo runs; silently ignored with --all
path_override = args.repo_path if not args.all else None
if args.remote:
# Single-repo remote: pull first, then fix
slug = repo_slugs[0]
repo = _api_get(args.api_base, f"/repos/{slug}")
path = resolve_repo_path(repo or {}, path_override) if repo else (path_override or "")
if path:
pull_ok, pull_msg = _git_pull(path)
prefix = "pull" if pull_ok else "pull WARN"
print(f" {prefix}: {pull_msg}")
report = fix_repo(args.api_base, slug, path_override, no_writeback=no_wb)
reports = [report]
elif do_fix:
reports = [
fix_repo(args.api_base, slug, path_override, no_writeback=no_wb)
for slug in repo_slugs
]
else:
reports = [check_repo(args.api_base, slug, path_override) for slug in repo_slugs]
if args.archive_closed:
for report in reports:
moved = archive_closed_workplans(report.repo_path, args.archive_date, args.archive_workplan)
report.fixes_applied.extend(f"archive: {m}" for m in moved)
if args.as_json:
output = (
report_to_dict(reports[0])
if len(reports) == 1
else [report_to_dict(r) for r in reports]
)
print(json.dumps(output, indent=2))
else:
for report in reports:
print(render_text(report))
print()
any_fail = any(r.failures for r in reports)
any_warn = any(r.warnings for r in reports)
if args.remote and args.all and not any_fail:
sys.exit(0)
sys.exit(1 if any_fail else 2 if any_warn else 0)
if __name__ == "__main__":
main()