Files
state-hub/scripts/consistency_check.py

2350 lines
94 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""consistency_check.py — ADR-001 consistency checking engine.
Runs bidirectional checks between workplan files in a registered repo and the
state-hub DB. The file is always authoritative; the DB is the cache/index layer.
Checks:
C-01 workplans-dir FAIL No workplans/ directory missing
C-02 workplan-parse FAIL No Workplan file cannot be parsed
C-03 workstream-stale-ref FAIL No state_hub_workstream_id in file not in DB
C-04 workstream-status-drift WARN Yes File status != DB status (file wins)
C-05 workstream-title-drift WARN Yes File title != DB title (file wins)
C-06 workstream-unlinked WARN Yes Workplan has no state_hub_workstream_id
C-07 orphan-db-active FAIL No Active DB workstream, no backing file
C-08 orphan-db-closed INFO No Finished/archived DB workstream, no file
C-09 workstream-repo-mismatch FAIL Yes DB workstream repo_id != file location
C-10 task-status-drift WARN Yes Task status differs between file and DB
C-11 task-unlinked WARN Yes Task block has no state_hub_task_id
C-12 orphan-db-task WARN No DB task in workstream has no file backing
C-13 workstream-auto-complete WARN Yes All DB tasks done but workstream still active
C-14 ghost-duplicate WARN No Active topic workstream with no repo_id matches a file-backed title — probable ghost from premature create_workstream() call
C-15 task-db-ahead WARN Yes DB task status is ahead of file — regression prevented; writeback syncs file
C-16 repo-behind-remote WARN No Local repo is behind remote tracking branch — --fix skipped to avoid clobbering remote progress
C-17 repo-ahead-push-failed WARN No Local repo has unpushed commits and push failed — writes skipped to prevent runaway divergence
C-19 workstream-planning-drift WARN Yes planning_priority/planning_order differs between file and DB
C-20 workstream-dependency-missing WARN Yes Workplan dependency frontmatter missing from DB graph
C-22 task-description-drift WARN Yes Task description/content differs between file and DB
C-23 workstream-active-task-planning-status WARN Yes Workstream/workplan is planning while a task is progress or wait
Usage:
python scripts/consistency_check.py --repo SLUG [--fix] [--no-writeback] [--json] [--api-base URL]
python scripts/consistency_check.py --all [--fix] [--no-writeback] [--json] [--api-base URL]
python scripts/consistency_check.py --here [PATH] [--fix] [--no-writeback] [--json] [--api-base URL]
Exit codes:
0 — clean (no FAILs or WARNs; INFOs are allowed)
1 — one or more FAILs present
2 — warnings-only strict CLI result (no FAILs, but WARNs present)
Agent/operator Make wrappers normalize exit code 2 to shell success while
preserving visible warning output. Use the direct script when a machine caller
needs to distinguish clean from warnings-only.
"""
from __future__ import annotations
import argparse
import os
import json
import re
import socket
import subprocess
import sys
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
_REPO_ROOT = Path(__file__).resolve().parent.parent
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
from api.workplan_status import ( # noqa: E402
CANONICAL_WORKSTREAM_STATUSES,
CLOSED_WORKSTREAM_STATUSES,
LEGACY_WORKSTREAM_STATUS_ALIASES,
OPEN_WORKSTREAM_STATUSES,
SUPPORTED_WORKSTREAM_STATUSES,
normalize_workstream_status as _normalize_workstream_status,
ready_review_status,
)
from api.services.lifecycle import should_activate_parent_for_active_tasks # noqa: E402
from api.task_status import ( # noqa: E402
CANONICAL_TASK_STATUSES,
OPEN_TASK_STATUSES,
TASK_STATUS_ORDER,
TERMINAL_TASK_STATUSES,
normalize_task_status,
)
try:
import yaml as _yaml
_HAS_YAML = True
except ImportError:
_HAS_YAML = False
try:
import httpx as _httpx
_HAS_HTTPX = True
except ImportError:
_HAS_HTTPX = False
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL)
_HEADING_RE = re.compile(r"^(#{1,4})\s+(.+?)$", re.MULTILINE)
_ARCHIVED_WP_RE = re.compile(r"^\d{6}-(.+\.md)$")
VALID_WP_STATUSES = set(CANONICAL_WORKSTREAM_STATUSES)
SUPPORTED_WP_STATUSES = set(SUPPORTED_WORKSTREAM_STATUSES)
VALID_TASK_STATUSES = set(CANONICAL_TASK_STATUSES)
VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"}
VALID_DEP_RELATIONSHIPS = {"blocks", "starts_after", "informs", "soft_dependency"}
DEFAULT_REMOTE_ALL_MAX_SECONDS = int(os.environ.get("CONSISTENCY_REMOTE_ALL_MAX_SECONDS", "300"))
# Legacy file/API aliases translated before comparison and PATCHing.
FILE_TO_DB_WORKSTREAM_STATUS: dict[str, str] = dict(LEGACY_WORKSTREAM_STATUS_ALIASES)
# Ordinal ranking for task statuses used by the no-regress rule (T01/C-15).
STATUS_ORDER: dict[str, int] = dict(TASK_STATUS_ORDER)
def normalise_workstream_status(status: str, *, has_started: bool | None = None) -> str:
"""Translate a workplan file status value to its DB-canonical equivalent."""
return _normalize_workstream_status(status, has_started=has_started)
def normalise_task_status(status: Any, *, default: str = "todo") -> str:
try:
return normalize_task_status(status, default=default)
except ValueError:
return default
def canonical_workplan_filename(path: Path) -> str:
"""Return the workplan filename without an archive completion-date prefix."""
return _ARCHIVED_WP_RE.sub(r"\1", path.name)
def workplan_display_path(repo_dir: Path, path: Path) -> str:
"""Stable relative path for reports, including archived/ when applicable."""
try:
return str(path.relative_to(repo_dir))
except ValueError:
return path.name
def iter_workplan_files(workplans_dir: Path, include_archived: bool = True) -> list[Path]:
"""Return active root workplans plus archived workplans when requested."""
files = sorted(workplans_dir.glob("*.md"))
archived_dir = workplans_dir / "archived"
if include_archived and archived_dir.is_dir():
files.extend(sorted(archived_dir.glob("*.md")))
return files
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
@dataclass
class Issue:
severity: str # FAIL | WARN | INFO
check_id: str # C-01 through C-12
message: str
file_path: str = ""
db_id: str = ""
file_value: str = ""
db_value: str = ""
fixable: bool = False
_fix_context: dict = field(default_factory=dict, repr=False)
@dataclass
class ConsistencyReport:
repo_slug: str
repo_path: str
issues: list[Issue] = field(default_factory=list)
fixes_applied: list[str] = field(default_factory=list)
def add(self, **kwargs) -> Issue:
issue = Issue(**kwargs)
self.issues.append(issue)
return issue
@property
def failures(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "FAIL"]
@property
def warnings(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "WARN"]
@property
def infos(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "INFO"]
@dataclass(frozen=True)
class RenormalizationRule:
check_id: str
invariant: str
detection: str
repair: str
test_anchor: str
RENORMALIZATION_RULES: tuple[RenormalizationRule, ...] = (
RenormalizationRule(
check_id="C-23",
invariant="Planning-state workplans cannot contain active task work.",
detection=(
"Workplan status is proposed, ready, or backlog while a linked "
"task is progress or wait."
),
repair="Patch the DB workstream and workplan frontmatter to status=active.",
test_anchor="tests/test_consistency_check.py::TestLifecycleRenormalization",
),
)
RENORMALIZATION_NEXT_GUARD_CHECKLIST: tuple[str, ...] = (
"Name the invariant and assign the next C-id.",
"Add rule metadata to RENORMALIZATION_RULES.",
"Add detection in check_repo before generic drift rules that could fight it.",
"Add the repair branch in fix_repo, using file writes only through helpers.",
"Add detection and repair tests under TestLifecycleRenormalization.",
)
@contextmanager
def run_lock(name: str):
"""Hold a nonblocking process lock for long-running consistency modes."""
try:
import fcntl
except ImportError:
yield True
return
lock_path = Path(os.environ.get("CONSISTENCY_LOCK_DIR", "/tmp")) / f"custodian-{name}.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
handle = lock_path.open("w", encoding="utf-8")
try:
try:
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
yield False
return
handle.seek(0)
handle.truncate()
handle.write(f"{os.getpid()} {datetime.utcnow().isoformat()}Z\n")
handle.flush()
yield True
finally:
try:
fcntl.flock(handle, fcntl.LOCK_UN)
except OSError:
pass
handle.close()
# ---------------------------------------------------------------------------
# YAML / frontmatter parsing
# ---------------------------------------------------------------------------
def _parse_yaml_block(raw: str) -> dict:
"""Parse a YAML string; fallback to simple key:value if pyyaml unavailable."""
if _HAS_YAML:
try:
return _yaml.safe_load(raw) or {}
except _yaml.YAMLError:
return {"_parse_error": True}
result: dict = {}
for line in raw.splitlines():
if ":" in line and not line.startswith(" "):
k, _, v = line.partition(":")
result[k.strip()] = v.strip().strip('"').strip("'")
return result
def parse_frontmatter(text: str) -> tuple[dict, str]:
"""Split YAML frontmatter from body. Returns ({}, text) if no frontmatter."""
if not text.startswith("---"):
return {}, text
parts = text.split("---", 2)
if len(parts) < 3:
return {}, text
meta = _parse_yaml_block(parts[1].strip())
return meta, parts[2]
def _clean_task_description(raw: str) -> str | None:
"""Trim section chrome while preserving task markdown content."""
lines = raw.splitlines()
while lines and not lines[0].strip():
lines.pop(0)
while lines and not lines[-1].strip():
lines.pop()
while lines and lines[-1].strip() in {"---", "***", "___"}:
lines.pop()
while lines and not lines[-1].strip():
lines.pop()
while lines and lines[0].strip() in {"---", "***", "___"}:
lines.pop(0)
while lines and not lines[0].strip():
lines.pop(0)
text = "\n".join(lines).strip()
return text or None
def parse_task_blocks(body: str) -> list[dict]:
"""Extract task blocks, injecting title and markdown content from the task section."""
headings = [
(m.start(), len(m.group(1)), m.group(2).strip())
for m in _HEADING_RE.finditer(body)
]
task_matches = list(_TASK_BLOCK_RE.finditer(body))
results = []
for idx, m in enumerate(task_matches):
task = _parse_yaml_block(m.group(1).strip())
prev = [(pos, level, text) for pos, level, text in headings if pos < m.start()]
prev_heading = prev[-1] if prev else None
if "title" not in task:
if prev_heading:
task["title"] = prev_heading[2]
content_end = len(body)
if idx + 1 < len(task_matches):
content_end = min(content_end, task_matches[idx + 1].start())
if prev_heading:
current_level = prev_heading[1]
next_peer_heading = next(
(
pos
for pos, level, _text in headings
if pos > m.end() and level <= current_level
),
None,
)
if next_peer_heading is not None:
content_end = min(content_end, next_peer_heading)
else:
next_heading = next((pos for pos, _level, _text in headings if pos > m.end()), None)
if next_heading is not None:
content_end = min(content_end, next_heading)
description = _clean_task_description(body[m.end():content_end])
if description:
task["description"] = description
results.append(task)
return results
def get_tasks_from_workplan(meta: dict, body: str) -> list[dict]:
"""Get tasks from workplan — handles both ```task``` blocks and tasks: YAML list."""
blocks = parse_task_blocks(body)
if blocks:
return blocks
# Fallback: tasks embedded as a YAML list in frontmatter (activity-core style)
tasks = meta.get("tasks")
if isinstance(tasks, list):
return tasks
return []
def _as_list(value: Any) -> list[str]:
if value is None:
return []
if isinstance(value, list):
return [str(item).strip().strip('"') for item in value if str(item).strip()]
if isinstance(value, str):
return [item.strip().strip('"') for item in value.split(",") if item.strip()]
return [str(value).strip().strip('"')]
def _as_int_or_none(value: Any) -> int | None:
if value in (None, "", "~", "null", "None", "none"):
return None
try:
return int(value)
except (TypeError, ValueError):
return None
# ---------------------------------------------------------------------------
# File update helpers
# ---------------------------------------------------------------------------
def _add_frontmatter_field(file_path: Path, key: str, value: str) -> None:
"""Insert key: "value" into frontmatter before the closing --- line."""
text = file_path.read_text(encoding="utf-8")
lines = text.split("\n")
close_idx = None
for i, line in enumerate(lines[1:], 1):
if line.strip() == "---":
close_idx = i
break
if close_idx is None:
return
lines.insert(close_idx, f'{key}: "{value}"')
file_path.write_text("\n".join(lines), encoding="utf-8")
def _patch_frontmatter_field(file_path: Path, key: str, value: str) -> bool:
"""Update or insert a scalar frontmatter field without rewriting the file."""
text = file_path.read_text(encoding="utf-8")
if not text.startswith("---"):
return False
lines = text.split("\n")
close_idx = None
for i, line in enumerate(lines[1:], 1):
if line.strip() == "---":
close_idx = i
break
if close_idx is None:
return False
new_line = f"{key}: {value}"
for i in range(1, close_idx):
if re.match(rf"^\s*{re.escape(key)}\s*:", lines[i]):
if lines[i] == new_line:
return False
lines[i] = new_line
file_path.write_text("\n".join(lines), encoding="utf-8")
return True
lines.insert(close_idx, new_line)
file_path.write_text("\n".join(lines), encoding="utf-8")
return True
def _inject_task_id_into_block(
file_path: Path, field_name: str, field_value: str, match_id: str
) -> bool:
"""Inject state_hub_task_id into the ```task``` block whose id == match_id."""
text = file_path.read_text(encoding="utf-8")
def _replace(m: re.Match) -> str:
block_content = m.group(1)
task_meta = _parse_yaml_block(block_content.strip())
if str(task_meta.get("id", "")) != match_id:
return m.group(0)
existing_val = task_meta.get(field_name)
if existing_val is not None and str(existing_val).strip() not in ("", "~", "null", "None", "none"):
return m.group(0)
# Replace existing null/~ line if present, otherwise append
new_content = re.sub(
rf"^{re.escape(field_name)}:.*$",
f'{field_name}: "{field_value}"',
block_content,
flags=re.MULTILINE,
)
if new_content == block_content:
new_content = block_content.rstrip() + f"\n{field_name}: \"{field_value}\""
return f"```task\n{new_content}\n```"
new_text = _TASK_BLOCK_RE.sub(_replace, text)
if new_text != text:
file_path.write_text(new_text, encoding="utf-8")
return True
return False
def _inject_task_id_frontmatter_list(
file_path: Path, field_value: str, match_id: str
) -> bool:
"""Inject state_hub_task_id into a task entry in frontmatter tasks: list."""
if not _HAS_YAML:
return False
import yaml
text = file_path.read_text(encoding="utf-8")
meta, body = parse_frontmatter(text)
tasks = meta.get("tasks", [])
changed = False
for t in tasks:
if str(t.get("id", "")) == match_id and "state_hub_task_id" not in t:
t["state_hub_task_id"] = field_value
changed = True
if not changed:
return False
try:
new_meta_str = yaml.dump(meta, allow_unicode=True, default_flow_style=False)
parts = text.split("---", 2)
if len(parts) < 3:
return False
new_text = f"---\n{new_meta_str}---{parts[2]}"
file_path.write_text(new_text, encoding="utf-8")
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def _api_get(
api_base: str,
path: str,
params: dict | None = None,
*,
return_error: bool = False,
) -> Any:
if not _HAS_HTTPX:
return {"_error": "httpx is not installed"} if return_error else None
# Only append trailing slash to the path component, not to query strings
if "?" not in path and not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
filtered = {k: v for k, v in (params or {}).items() if v is not None}
r = c.get(path, params=filtered if filtered else None)
r.raise_for_status()
return r.json()
except _httpx.HTTPStatusError as exc:
if exc.response.status_code == 404:
return None
if return_error:
return {"_error": str(exc)}
return None
except Exception as exc:
if return_error:
return {"_error": str(exc)}
return None
def _api_patch(api_base: str, path: str, body: dict) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.patch(path, json=body)
r.raise_for_status()
return r.json()
except Exception as exc:
# Return a sentinel dict so callers can distinguish "API error" from "success"
# and report it rather than silently dropping the fix.
return {"_error": str(exc)}
def _api_post(api_base: str, path: str, body: dict) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.post(path, json=body)
r.raise_for_status()
return r.json()
except Exception:
return None
# ---------------------------------------------------------------------------
# Core check engine
# ---------------------------------------------------------------------------
def resolve_repo_path(repo: dict, override: str | None = None) -> str:
"""Resolve the local filesystem path for a repo on the current machine.
Priority:
1. Explicit --repo-path CLI override
2. host_paths[current_hostname] — per-machine path registered via POST /repos/{slug}/paths/
3. local_path — legacy single-path field (backward compat)
"""
if override:
return override
hostname = socket.gethostname()
host_paths = repo.get("host_paths") or {}
return host_paths.get(hostname) or repo.get("local_path") or ""
def _infer_slug_from_path(api_base: str, path: str) -> "tuple[str, str] | None":
"""Identify a registered repo from a local checkout path.
Strategy (in order):
1. Root-commit fingerprint — ``git rev-list --max-parents=0 HEAD`` produces
the same SHA-1 on every clone, independent of remote URL or checkout path.
Looked up via ``GET /repos/by-fingerprint?hash=<sha>``.
2. Remote URL fallback — exact string match against ``remote_url`` field.
Less reliable across machines (SSH aliases, HTTP vs HTTPS, etc.) but
useful when fingerprint is not yet stored.
Returns ``(slug, git_root)`` on success, ``None`` if no match found.
"""
try:
git_root = subprocess.check_output(
["git", "rev-parse", "--show-toplevel"],
cwd=path, stderr=subprocess.DEVNULL, text=True,
).strip()
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
return None
# Strategy 1: fingerprint lookup (most reliable)
try:
fingerprint = subprocess.check_output(
["git", "rev-list", "--max-parents=0", "HEAD"],
cwd=git_root, stderr=subprocess.DEVNULL, text=True,
).strip()
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
fingerprint = ""
# Get local remote URL once — used for both disambiguation and fallback
try:
remote_url = subprocess.check_output(
["git", "remote", "get-url", "origin"],
cwd=git_root, stderr=subprocess.DEVNULL, text=True,
).strip()
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
remote_url = ""
if fingerprint:
# Try fingerprint + remote URL for precise match
if remote_url:
import urllib.parse as _up
candidates = _api_get(
api_base,
f"/repos/by-fingerprint?hash={fingerprint}&remote_url={_up.quote(remote_url, safe='')}",
)
if isinstance(candidates, list) and len(candidates) == 1:
return candidates[0]["slug"], git_root
# Fingerprint alone (works when repos don't share ancestry)
candidates = _api_get(api_base, f"/repos/by-fingerprint?hash={fingerprint}")
if isinstance(candidates, list):
if len(candidates) == 1:
return candidates[0]["slug"], git_root
if len(candidates) > 1:
# Disambiguate: prefer the repo whose slug appears in the git_root path
for repo in candidates:
if repo["slug"] in git_root:
return repo["slug"], git_root
# Can't disambiguate — return first match with a warning
print(
f" WARNING: {len(candidates)} repos share fingerprint {fingerprint[:12]}"
f"— using '{candidates[0]['slug']}'. "
"Set remote_url on each repo for accurate matching.",
file=sys.stderr,
)
return candidates[0]["slug"], git_root
# Strategy 2: remote URL exact match (fallback)
if remote_url:
import urllib.parse as _up
repo = _api_get(api_base, f"/repos/by-remote?url={_up.quote(remote_url, safe='')}")
if repo and isinstance(repo, dict) and "slug" in repo:
return repo["slug"], git_root
return None
def check_repo(api_base: str, repo_slug: str, repo_path_override: str | None = None) -> ConsistencyReport:
"""Run all consistency checks for a registered repo."""
repo = _api_get(api_base, f"/repos/{repo_slug}", return_error=True)
if isinstance(repo, dict) and "_error" in repo:
report = ConsistencyReport(repo_slug=repo_slug, repo_path=repo_path_override or "(unknown)")
report.add(
severity="FAIL", check_id="C-00",
message=(
f"Could not query State Hub API at {api_base}: {repo['_error']}"
),
fixable=False,
)
return report
if repo is None:
report = ConsistencyReport(repo_slug=repo_slug, repo_path="(unknown)")
report.add(
severity="FAIL", check_id="C-00",
message=f"Repo '{repo_slug}' not found in state-hub DB",
fixable=False,
)
return report
repo_id: str = repo["id"]
repo_path: str = resolve_repo_path(repo, repo_path_override)
report = ConsistencyReport(repo_slug=repo_slug, repo_path=repo_path)
if not repo_path:
report.add(
severity="FAIL", check_id="C-00",
message=f"Repo '{repo_slug}' has no local_path — cannot check workplan files",
fixable=False,
)
_check_orphan_db(api_base, repo_id, set(), report)
return report
repo_dir = Path(repo_path)
workplans_dir = repo_dir / "workplans"
# C-01: workplans/ directory missing
if not workplans_dir.is_dir():
report.add(
severity="FAIL", check_id="C-01",
message=(
"workplans/ directory missing — ADR-001 requires workplan files "
"at <repo>/workplans/<ID>-<slug>.md"
),
fixable=False,
)
_check_orphan_db(api_base, repo_id, set(), report)
return report
# Parse workplan files
workplan_infos: list[tuple[Path, dict, str]] = []
file_ws_ids: dict[str, tuple[Path, dict, str]] = {} # ws_id → (file, meta, body)
active_file_ws_ids: set[str] = set()
for wp_file in iter_workplan_files(workplans_dir):
try:
text = wp_file.read_text(encoding="utf-8")
except OSError as e:
report.add(severity="FAIL", check_id="C-02",
message=f"Cannot read file: {e}", file_path=workplan_display_path(repo_dir, wp_file))
continue
if not text.startswith("---"):
report.add(severity="FAIL", check_id="C-02",
message="No YAML frontmatter found", file_path=workplan_display_path(repo_dir, wp_file))
continue
meta, body = parse_frontmatter(text)
if not meta or meta.get("_parse_error"):
report.add(severity="FAIL", check_id="C-02",
message="YAML frontmatter parse error", file_path=workplan_display_path(repo_dir, wp_file))
continue
workplan_infos.append((wp_file, meta, body))
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
if ws_id:
file_ws_ids[ws_id] = (wp_file, meta, body)
if wp_file.parent == workplans_dir:
active_file_ws_ids.add(ws_id)
workplan_id_to_ws_id: dict[str, str] = {}
task_file_id_to_sh_id: dict[str, str] = {}
for _wp_file, meta, body in workplan_infos:
mapped_ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
wp_id = str(meta.get("id", "")).strip()
if wp_id and mapped_ws_id:
workplan_id_to_ws_id[wp_id] = mapped_ws_id
for task in get_tasks_from_workplan(meta, body):
if task.get("_parse_error"):
continue
task_file_id = str(task.get("id", "")).strip()
raw_sh = task.get("state_hub_task_id")
task_sh_id = "" if raw_sh is None else str(raw_sh).strip().strip('"')
if task_file_id and task_sh_id and task_sh_id not in ("~", "null", "None", "none"):
task_file_id_to_sh_id[task_file_id] = task_sh_id
# Per-workplan checks
for wp_file, meta, body in workplan_infos:
fname = workplan_display_path(repo_dir, wp_file)
archived_file = wp_file.parent.name == "archived"
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
file_status = str(meta.get("status", "")).strip()
file_title = str(meta.get("title", "")).strip()
file_domain = str(meta.get("domain", "")).strip()
normalised_file_status = normalise_workstream_status(file_status)
if archived_file and normalised_file_status not in CLOSED_WORKSTREAM_STATUSES:
report.add(
severity="FAIL", check_id="C-18",
message="Archived workplan file has an open or planning status",
file_path=fname,
file_value=file_status,
fixable=False,
)
if not ws_id:
# C-06: workplan not linked to any DB workstream
report.add(
severity="WARN", check_id="C-06",
message="Workplan has no state_hub_workstream_id — not indexed in DB",
file_path=fname,
file_value=file_title or fname,
fixable=True,
_fix_context={
"wp_file": str(wp_file),
"meta": meta,
"body": body,
"repo_id": repo_id,
"domain": file_domain,
},
)
continue
ws = _api_get(api_base, f"/workstreams/{ws_id}")
if ws is None:
# C-03: stale workstream reference
report.add(
severity="FAIL", check_id="C-03",
message=f"state_hub_workstream_id {ws_id[:8]}… not found in DB (stale reference)",
file_path=fname,
db_id=ws_id,
fixable=False,
)
continue
# C-09: repo mismatch — file is here but DB says different repo
db_repo_id: str = ws.get("repo_id") or ""
if db_repo_id != repo_id:
report.add(
severity="FAIL", check_id="C-09",
message=(
f"Workstream '{ws.get('slug')}' repo_id in DB is "
f"{db_repo_id[:8] if db_repo_id else 'None'} "
f"but backing file lives in {repo_slug} ({repo_id[:8]}…)"
),
file_path=fname,
db_id=ws_id,
file_value=repo_id,
db_value=db_repo_id,
fixable=True,
_fix_context={"ws_id": ws_id, "correct_repo_id": repo_id},
)
# Continue to check drift even with mismatched repo
tasks = get_tasks_from_workplan(meta, body)
db_tasks = _api_get(api_base, "/tasks", {"workstream_id": ws_id})
file_task_statuses = [
str(task.get("status", "")).strip()
for task in tasks
if not task.get("_parse_error")
]
db_task_statuses = [
str(task.get("status", "")).strip()
for task in db_tasks
if isinstance(db_tasks, list)
] if isinstance(db_tasks, list) else []
active_task_requires_activation = should_activate_parent_for_active_tasks(
parent_workstream_status=file_status,
task_statuses=[*file_task_statuses, *db_task_statuses],
)
if active_task_requires_activation:
report.add(
severity="WARN",
check_id="C-23",
message=(
f"Lifecycle drift in '{ws.get('slug')}': workplan status "
f"{file_status!r} has an active task — repair to 'active'"
),
file_path=fname,
db_id=ws_id,
file_value=file_status,
db_value=ws.get("status", ""),
fixable=True,
_fix_context={
"ws_id": ws_id,
"wp_file": str(wp_file),
"file_status": file_status,
"db_status": ws.get("status", ""),
"target_status": "active",
},
)
# C-04: status drift — normalise file value before comparing so that
# legacy file/API aliases are not treated as drift. Lifecycle repairs
# take precedence so a stale planning file cannot regress active work.
db_status = ws.get("status", "")
normalised_db_status = normalise_workstream_status(db_status)
if (
not active_task_requires_activation
and file_status
and db_status
and normalised_file_status != normalised_db_status
):
report.add(
severity="WARN", check_id="C-04",
message=(
f"Status drift in '{ws.get('slug')}': "
f"file={file_status!r} db={db_status!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=file_status,
db_value=db_status,
fixable=True,
_fix_context={
"ws_id": ws_id,
"field": "status",
"value": normalised_file_status, # always send DB-valid value
},
)
if normalised_file_status == "ready":
review = ready_review_status(
repo_dir,
meta.get("reviewed_against_commit"),
meta.get("context_paths"),
)
if review.needs_review:
detail = f"Ready workplan may be stale: {review.reason}"
if review.changed_paths:
preview = ", ".join(review.changed_paths[:5])
extra = "" if len(review.changed_paths) <= 5 else ", ..."
detail = f"{detail}; changed paths: {preview}{extra}"
report.add(
severity="WARN",
check_id="C-21",
message=detail,
file_path=fname,
file_value=file_status,
db_value="needs_review",
fixable=False,
)
# C-05: title drift
db_title = ws.get("title", "")
if file_title and db_title and file_title != db_title:
report.add(
severity="WARN", check_id="C-05",
message=(
f"Title drift in '{ws.get('slug')}': "
f"file={file_title!r} db={db_title!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=file_title,
db_value=db_title,
fixable=True,
_fix_context={"ws_id": ws_id, "field": "title", "value": file_title},
)
planning_priority = str(meta.get("planning_priority", "")).strip() or None
if planning_priority != (ws.get("planning_priority") or None):
report.add(
severity="WARN", check_id="C-19",
message=(
f"Planning priority drift in '{ws.get('slug')}': "
f"file={planning_priority!r} db={ws.get('planning_priority')!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=planning_priority,
db_value=ws.get("planning_priority"),
fixable=True,
_fix_context={"ws_id": ws_id, "field": "planning_priority", "value": planning_priority},
)
planning_order = _as_int_or_none(meta.get("planning_order"))
if planning_order != ws.get("planning_order"):
report.add(
severity="WARN", check_id="C-19",
message=(
f"Planning order drift in '{ws.get('slug')}': "
f"file={planning_order!r} db={ws.get('planning_order')!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=planning_order,
db_value=ws.get("planning_order"),
fixable=True,
_fix_context={"ws_id": ws_id, "field": "planning_order", "value": planning_order},
)
# C-10, C-11, C-12: task-level checks
db_task_by_id: dict[str, dict] = {}
if isinstance(db_tasks, list):
for t in db_tasks:
db_task_by_id[t["id"]] = t
existing_deps = _api_get(api_base, f"/workstreams/{ws_id}/dependencies") or []
existing_dep_keys = set()
if isinstance(existing_deps, list):
for dep in existing_deps:
if dep.get("from_workstream_id") != ws_id:
continue
rel = dep.get("relationship_type") or "blocks"
if dep.get("to_workstream_id"):
existing_dep_keys.add(("workstream", dep["to_workstream_id"], rel))
if dep.get("to_task_id"):
existing_dep_keys.add(("task", dep["to_task_id"], rel))
for target_wp_id in _as_list(meta.get("depends_on_workplans")):
target_ws_id = workplan_id_to_ws_id.get(target_wp_id)
if not target_ws_id:
report.add(
severity="WARN",
check_id="C-20",
message=f"Workplan dependency target '{target_wp_id}' is not linked to State Hub",
file_path=fname,
file_value=target_wp_id,
fixable=False,
)
continue
dep_key = ("workstream", target_ws_id, "blocks")
if dep_key not in existing_dep_keys:
report.add(
severity="WARN",
check_id="C-20",
message=f"Missing DB dependency edge: {ws_id[:8]}… depends on workplan {target_wp_id}",
file_path=fname,
db_id=ws_id,
file_value=target_wp_id,
fixable=True,
_fix_context={
"from_workstream_id": ws_id,
"to_workstream_id": target_ws_id,
"relationship_type": "blocks",
},
)
for target_task_id in _as_list(meta.get("depends_on_tasks")):
target_sh_id = task_file_id_to_sh_id.get(target_task_id)
if not target_sh_id:
report.add(
severity="WARN",
check_id="C-20",
message=f"Task dependency target '{target_task_id}' is not linked to State Hub",
file_path=fname,
file_value=target_task_id,
fixable=False,
)
continue
dep_key = ("task", target_sh_id, "starts_after")
if dep_key not in existing_dep_keys:
report.add(
severity="WARN",
check_id="C-20",
message=f"Missing DB dependency edge: {ws_id[:8]}… starts after task {target_task_id}",
file_path=fname,
db_id=ws_id,
file_value=target_task_id,
fixable=True,
_fix_context={
"from_workstream_id": ws_id,
"to_task_id": target_sh_id,
"relationship_type": "starts_after",
},
)
file_task_sh_ids: set[str] = set()
for task in tasks:
if task.get("_parse_error"):
continue
t_id = str(task.get("id", "")).strip()
_raw_sh = task.get("state_hub_task_id")
t_sh_id = "" if _raw_sh is None else str(_raw_sh).strip().strip('"')
if t_sh_id in ("~", "null", "None", "none"):
t_sh_id = ""
t_status = normalise_task_status(task.get("status", "todo"))
if t_sh_id:
file_task_sh_ids.add(t_sh_id)
db_task = _api_get(api_base, f"/tasks/{t_sh_id}")
if db_task is None:
report.add(
severity="FAIL", check_id="C-03",
message=f"state_hub_task_id {t_sh_id[:8]}… not found in DB",
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
fixable=False,
)
continue
# C-10 / C-15: task status drift
db_t_status = normalise_task_status(db_task.get("status", "todo"))
if t_status and db_t_status and t_status != db_t_status:
db_rank = STATUS_ORDER.get(db_t_status, 0)
file_rank = STATUS_ORDER.get(t_status, 0)
if db_rank >= file_rank:
# C-15: DB is already at the same rank or ahead — prevent
# regression. Writeback syncs the file to match DB.
report.add(
severity="WARN", check_id="C-15",
message=(
f"DB task '{t_id}' is ahead of file "
f"(db={db_t_status!r}, file={t_status!r}) "
f"— regression prevented; writeback will sync file"
),
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
file_value=t_status,
db_value=db_t_status,
fixable=True,
_fix_context={
"task_id": t_sh_id,
"wp_file": str(wp_file),
"task_block_id": t_id,
"db_status": db_t_status,
},
)
else:
# C-10: file is ahead — apply file→DB sync (normal drift)
report.add(
severity="WARN", check_id="C-10",
message=(
f"Task status drift '{t_id}': "
f"file={t_status!r} db={db_t_status!r} (file wins)"
),
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
file_value=t_status,
db_value=db_t_status,
fixable=True,
_fix_context={"task_id": t_sh_id, "status": t_status},
)
file_description = task.get("description")
if isinstance(file_description, str):
file_description = file_description.strip() or None
else:
file_description = None
db_description = db_task.get("description")
if isinstance(db_description, str):
db_description = db_description.strip() or None
else:
db_description = None
if file_description and file_description != db_description:
report.add(
severity="WARN", check_id="C-22",
message=f"Task description drift '{t_id}': file content differs from DB (file wins)",
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
file_value=file_description[:120],
db_value=(db_description or "")[:120],
fixable=True,
_fix_context={"task_id": t_sh_id, "description": file_description},
)
elif t_id:
# C-11: task exists in file but not linked to DB
ws_status = ws.get("status", "")
report.add(
severity="WARN", check_id="C-11",
message=f"Task '{t_id}' has no state_hub_task_id",
file_path=f"{fname}#{t_id}",
fixable=True,
_fix_context={
"ws_id": ws_id,
"ws_status": ws_status,
"task": task,
"wp_file": str(wp_file),
"meta": meta,
"body": body,
},
)
# C-12: DB tasks with no file backing
if isinstance(db_tasks, list):
ws_status = ws.get("status", "")
ws_finished = normalise_workstream_status(ws_status) in CLOSED_WORKSTREAM_STATUSES
for db_t in db_tasks:
if db_t["id"] not in file_task_sh_ids:
db_t_status = db_t.get("status", "")
open_task = db_t_status not in TERMINAL_TASK_STATUSES
# Auto-cancel fixable when workstream is finished and task is open
fixable = ws_finished and open_task
report.add(
severity="WARN", check_id="C-12",
message=(
f"DB task '{db_t.get('title', '')}' "
f"(id={db_t['id'][:8]}…, status={db_t_status}) "
f"in workstream '{ws.get('slug')}' has no file backing"
),
db_id=db_t["id"],
fixable=fixable,
_fix_context={
"task_id": db_t["id"],
"ws_finished": ws_finished,
},
)
# C-13: all DB tasks done but workstream still active — worker forgot to close
db_status = ws.get("status", "")
if normalise_workstream_status(db_status) == "active" and isinstance(db_tasks, list) and db_tasks:
non_terminal = [
t for t in db_tasks
if normalise_task_status(t.get("status", "todo")) not in TERMINAL_TASK_STATUSES
]
if not non_terminal:
report.add(
severity="WARN", check_id="C-13",
message=(
f"All {len(db_tasks)} DB tasks for '{ws.get('slug')}' are "
f"done/cancel but workstream status is still 'active'"
f"worker likely forgot update_workstream_status()"
),
file_path=fname,
db_id=ws_id,
db_value="active",
fixable=True,
_fix_context={
"ws_id": ws_id,
"field": "status",
"value": "finished",
},
)
# C-07 / C-08: orphan DB workstreams (have repo_id=this_repo but no backing file)
_check_orphan_db(api_base, repo_id, set(file_ws_ids.keys()), report, active_file_ws_ids)
# C-14: ghost duplicate — active workstream on same topic with no repo_id whose
# title matches a file-backed workstream. Root cause: create_workstream() called
# before the workplan file was written; fix-consistency then created a second
# workstream from the file, leaving the first as an invisible orphan.
_check_ghost_duplicates(api_base, workplan_infos, file_ws_ids, report)
return report
def _check_orphan_db(
api_base: str,
repo_id: str,
file_ws_ids: set[str],
report: ConsistencyReport,
active_file_ws_ids: set[str] | None = None,
) -> None:
"""Flag DB workstreams with repo_id=this_repo that have no backing workplan file."""
active_file_ws_ids = active_file_ws_ids or file_ws_ids
all_ws = _api_get(api_base, "/workstreams", {"repo_id": repo_id})
if not isinstance(all_ws, list):
return
for ws in all_ws:
ws_id = ws["id"]
ws_status = ws.get("status", "")
normalised_status = normalise_workstream_status(ws_status)
if normalised_status not in CLOSED_WORKSTREAM_STATUSES and ws_id in active_file_ws_ids:
continue
if normalised_status in CLOSED_WORKSTREAM_STATUSES and ws_id in file_ws_ids:
continue
ws_slug = ws.get("slug", "")
if normalised_status not in CLOSED_WORKSTREAM_STATUSES:
report.add(
severity="FAIL", check_id="C-07",
message=(
f"Non-closed DB workstream '{ws_slug}' (id={ws_id[:8]}…) "
f"has no backing workplan file — ADR-001 violation"
),
db_id=ws_id,
fixable=False,
)
elif normalised_status in CLOSED_WORKSTREAM_STATUSES:
report.add(
severity="INFO", check_id="C-08",
message=(
f"Closed DB workstream '{ws_slug}' "
f"(id={ws_id[:8]}…, status={ws_status}) has no backing workplan file"
),
db_id=ws_id,
fixable=False,
)
def _check_ghost_duplicates(
api_base: str,
workplan_infos: list[tuple],
file_ws_ids: dict[str, tuple],
report: ConsistencyReport,
) -> None:
"""C-14: detect active workstreams with repo_id=null whose title matches a
file-backed workstream — these are ghosts created by premature create_workstream()
calls before the workplan file existed.
"""
# Build lookup: normalised title → file-backed workstream id
file_titles: dict[str, str] = {}
for _, meta, _ in workplan_infos:
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
title = str(meta.get("title", "")).strip().lower()
if title and ws_id:
file_titles[title] = ws_id
if not file_titles:
return
# Gather topic_ids from all file-backed workstreams so we can query by topic
topic_ids: set[str] = set()
for ws_id in file_ws_ids:
ws = _api_get(api_base, f"/workstreams/{ws_id}")
if ws and ws.get("topic_id"):
topic_ids.add(ws["topic_id"])
for topic_id in topic_ids:
topic_ws: list[dict] = []
for status in OPEN_WORKSTREAM_STATUSES:
status_rows = _api_get(api_base, "/workstreams", {"topic_id": topic_id, "status": status})
if isinstance(status_rows, list):
topic_ws.extend(status_rows)
for ws in topic_ws:
ws_id = ws["id"]
if ws_id in file_ws_ids:
continue # legitimately linked
if ws.get("repo_id"):
continue # C-07 covers repo-scoped orphans
ws_title = ws.get("title", "").strip().lower()
if ws_title in file_titles:
file_backed_id = file_titles[ws_title]
report.add(
severity="WARN",
check_id="C-14",
message=(
f"Ghost duplicate: active workstream '{ws.get('slug')}' "
f"(id={ws_id[:8]}…, repo_id=null) has same title as "
f"file-backed workstream {file_backed_id[:8]}… — "
f"likely created via create_workstream() before workplan file existed; "
f"archive it"
),
db_id=ws_id,
fixable=False,
)
# ---------------------------------------------------------------------------
# Git sync (T02T04: pull gate, writeback, push seal) — see repo_sync.py
# ---------------------------------------------------------------------------
# repo_sync.py owns the push-seal invariant and all git lifecycle primitives.
# The aliases below keep internal call sites and existing tests unchanged.
from repo_sync import ( # noqa: E402 (import after top-level imports intentional)
count_local_ahead as _detect_ahead_of_remote,
count_remote_ahead as _count_remote_ahead,
pull_ff as _git_pull,
push_ff as _git_push,
)
def _detect_behind_remote(repo_path: str) -> bool:
"""True if remote has commits the local repo lacks (C-16 predicate).
Delegates to repo_sync.count_remote_ahead, which fetches before counting.
Returns False on any error so C-16 is never spuriously triggered.
"""
return _count_remote_ahead(repo_path) > 0
def _patch_task_status_in_file(
file_path: Path, task_block_id: str, new_status: str
) -> bool:
"""Update the ``status:`` field inside a task block identified by its id.
Only modifies lines inside a ```task … ``` fenced block whose ``id:``
matches *task_block_id*. Returns True if the file was changed.
"""
text = file_path.read_text(encoding="utf-8")
def _replace(m: re.Match) -> str:
# m.group(0) is the full ```task...``` block including fences.
# m.group(1) is the inner YAML content (no fences).
full_block = m.group(0)
inner = m.group(1).strip()
task_meta = _parse_yaml_block(inner)
if str(task_meta.get("id", "")).strip() != task_block_id:
return full_block
# Replace the status line only, leave everything else untouched.
return re.sub(
r"^(status:\s*)\S+",
rf"\g<1>{new_status}",
full_block,
flags=re.MULTILINE,
)
new_text = _TASK_BLOCK_RE.sub(_replace, text)
if new_text != text:
file_path.write_text(new_text, encoding="utf-8")
return True
return False
def _git_commit_writeback(
repo_path: str,
file_path: Path,
changes: list[str],
*,
subject: str = "chore(consistency): sync task status from DB [auto]",
) -> bool:
"""Stage *file_path* and commit with a standard writeback message.
Returns True on success, False on any error (errors are logged to stderr
but do not abort the consistency run).
"""
from datetime import date as _date
summary = "\n".join(f" - {c}" for c in changes)
msg = (
f"{subject}\n\n"
f"Updated by fix-consistency on {_date.today().isoformat()}:\n"
f"{summary}"
)
import os as _os
# Pass GIT_CUSTODIAN_SYNC=1 so the post-commit hook can detect it is
# running from within a sync pass and exit early, preventing re-entrancy.
sync_env = {**_os.environ, "GIT_CUSTODIAN_SYNC": "1"}
try:
subprocess.run(
["git", "-C", repo_path, "add", str(file_path)],
check=True, capture_output=True, env=sync_env,
)
subprocess.run(
["git", "-C", repo_path, "commit", "-m", msg],
check=True, capture_output=True, env=sync_env,
)
return True
except subprocess.CalledProcessError as e:
print(
f"WARN: git commit failed for writeback: {e.stderr.decode().strip()}",
file=sys.stderr,
)
return False
# ---------------------------------------------------------------------------
# Worker orientation brief (.custodian-brief.md)
# ---------------------------------------------------------------------------
_BRIEF_HEADER = "<!-- custodian-brief: generated by fix-consistency — do not edit manually -->"
_TASK_STATUS_ICON = {"done": "", "cancel": "", "progress": "", "wait": "!", "todo": "·"}
_OPEN_STATUSES = set(OPEN_TASK_STATUSES)
def _write_custodian_brief(api_base: str, repo_slug: str, repo_path: str) -> bool:
"""Generate .custodian-brief.md at the repo root and git-commit if changed.
The brief gives any agent — including subagents without MCP access and
workers on remote machines — instant orientation without a live hub
connection. Returns True if the file was written (content changed).
"""
import datetime as _dt
from datetime import timezone as _tz
repo = _api_get(api_base, f"/repos/{repo_slug}")
if not repo:
return False
repo_id: str = repo.get("id", "")
domain_slug: str = ""
# Resolve domain slug: prefer active workstreams, fall back to any workstream
# so that a fully-finished repo doesn't degrade to "(unknown)".
workstreams: list[dict] = []
for status in OPEN_WORKSTREAM_STATUSES:
rows = _api_get(api_base, "/workstreams", {"repo_id": repo_id, "status": status}) or []
if isinstance(rows, list):
workstreams.extend(rows)
_ws_for_domain = workstreams if workstreams else []
if not _ws_for_domain:
all_ws = _api_get(api_base, "/workstreams", {"repo_id": repo_id}) or []
_ws_for_domain = all_ws if isinstance(all_ws, list) else []
if _ws_for_domain:
topic = _api_get(api_base, f"/topics/{_ws_for_domain[0].get('topic_id', '')}")
if topic:
domain_slug = topic.get("domain_slug", "")
# Active repo goal (first active one if multiple)
goal_text = ""
goals = _api_get(api_base, "/repo-goals", {"repo_slug": repo_slug}) or []
if isinstance(goals, list):
active_goals = [g for g in goals if g.get("status") == "active"]
if active_goals:
g = active_goals[0]
goal_text = g.get("title", "") or g.get("description", "")
now_utc = _dt.datetime.now(_tz.utc)
ts = now_utc.strftime("%Y-%m-%d %H:%M UTC")
lines = [
_BRIEF_HEADER,
f"# Custodian Brief — {repo_slug}",
"",
f"**Domain:** {domain_slug or '(unknown)'} ",
f"**Last synced:** {ts} ",
"**State Hub:** http://127.0.0.1:8000 *(adjust if running on a remote machine)*",
"",
]
if goal_text:
lines += ["## Current Goal", "", goal_text, ""]
if isinstance(workstreams, list) and workstreams:
lines.append("## Active Workstreams")
for ws in workstreams:
ws_title = ws.get("title", ws.get("slug", "?"))
ws_id = ws["id"]
tasks = _api_get(api_base, "/tasks", {"workstream_id": ws_id}) or []
if not isinstance(tasks, list):
tasks = []
done = sum(
1 for t in tasks
if normalise_task_status(t.get("status", "todo")) in TERMINAL_TASK_STATUSES
)
total = len(tasks)
pct = f"{done}/{total}" if total else "no tasks"
open_tasks = [
t for t in tasks
if normalise_task_status(t.get("status", "todo")) in _OPEN_STATUSES
]
# Show wait first, then progress, then todo (cap at 5).
priority_order = {"wait": 0, "progress": 1, "todo": 2}
open_tasks.sort(
key=lambda t: priority_order.get(
normalise_task_status(t.get("status", "todo")), 9
)
)
lines += [
"",
f"### {ws_title}",
f"Progress: {pct} done | workstream_id: `{ws_id}`",
]
if open_tasks:
lines.append("")
lines.append("**Open tasks:**")
for t in open_tasks[:7]:
status = normalise_task_status(t.get("status", "todo"))
icon = _TASK_STATUS_ICON.get(status, "·")
title = t.get("title", t["id"])
tid = t["id"]
blocker = t.get("blocking_reason", "")
task_line = f"- {icon} {title} `{tid[:8]}`"
if status == "wait" and blocker:
task_line += f"\n *(wait: {blocker})*"
lines.append(task_line)
if len(open_tasks) > 7:
lines.append(f"- … and {len(open_tasks) - 7} more open tasks")
else:
lines += ["## Active Workstreams", "", "*(none — repo may need first-session setup)*"]
lines += [
"",
"---",
"## MCP Orientation (when available)",
"",
"If the state-hub MCP server is reachable, call:",
f"`get_domain_summary(\"{domain_slug}\")`",
"This provides richer cross-domain context.",
"If the MCP call fails, use this file as your orientation source.",
]
content = "\n".join(lines) + "\n"
brief_path = Path(repo_path) / ".custodian-brief.md"
existing = brief_path.read_text(encoding="utf-8") if brief_path.exists() else ""
# Strip the timestamp line before comparing to avoid spurious writes
def _strip_ts(text: str) -> str:
return "\n".join(
ln for ln in text.splitlines()
if not ln.startswith("**Last synced:**")
)
if _strip_ts(content) == _strip_ts(existing):
return False # no meaningful change
brief_path.write_text(content, encoding="utf-8")
# Commit the brief so remote workers can pull it
_git_commit_writeback(
repo_path,
brief_path,
[f"update .custodian-brief.md for {repo_slug}"],
)
return True
# ---------------------------------------------------------------------------
# Fix engine
# ---------------------------------------------------------------------------
def fix_repo(
api_base: str,
repo_slug: str,
repo_path_override: str | None = None,
no_writeback: bool = False,
) -> ConsistencyReport:
"""Run checks then apply all auto-fixable issues. Returns updated report."""
report = check_repo(api_base, repo_slug, repo_path_override)
if any(i.check_id == "C-00" for i in report.failures):
return report
# Auto-register this machine's path in host_paths so future runs work
# without --repo-path. Idempotent: skipped when already correct.
repo_path = report.repo_path
if repo_path:
repo_record = _api_get(api_base, f"/repos/{repo_slug}")
if repo_record:
hostname = socket.gethostname()
if (repo_record.get("host_paths") or {}).get(hostname) != repo_path:
result = _api_post(
api_base, f"/repos/{repo_slug}/paths",
{"host": hostname, "path": repo_path},
)
if result and "_error" not in result:
report.fixes_applied.append(
f"host_paths[{hostname}] → {repo_path}"
)
# T02 — pull gate: warn and skip all write operations when local repo is
# behind its remote tracking branch.
if repo_path and _detect_behind_remote(repo_path):
report.add(
severity="WARN", check_id="C-16",
message=(
f"Repo '{repo_slug}' is behind its remote tracking branch — "
f"pull before fixing to avoid clobbering remote progress. "
f"Run: git -C {repo_path} pull --ff-only"
),
fixable=False,
)
report.fixes_applied.append(
"C-16: all write operations skipped — local repo is behind remote"
)
return report
# C-17: backlog guard — if local has unpushed commits from a prior failed push,
# try to push them before making more. Skipping writes prevents runaway divergence.
if repo_path:
ahead = _detect_ahead_of_remote(repo_path)
if ahead > 0:
push_ok, push_msg = _git_push(repo_path)
if not push_ok:
report.add(
severity="WARN", check_id="C-17",
message=(
f"Repo '{repo_slug}' has {ahead} unpushed commit(s) and push "
f"failed ({push_msg}) — skipping writes to prevent runaway divergence"
),
fixable=False,
)
report.fixes_applied.append(
"C-17: all write operations skipped — unpushed commits, push failed"
)
return report
# Push succeeded — local is now in sync; proceed normally
report.fixes_applied.append(f"C-17 cleared: pushed {ahead} backlogged commit(s)")
fixable = [i for i in report.issues if i.fixable]
for issue in fixable:
ctx = issue._fix_context
try:
if issue.check_id in ("C-04", "C-05", "C-13", "C-19"):
ws_id = ctx["ws_id"]
result = _api_patch(api_base, f"/workstreams/{ws_id}",
{ctx["field"]: ctx["value"]})
if result is not None and "_error" not in result:
report.fixes_applied.append(
f"{issue.check_id} fixed: workstream {ws_id[:8]}"
f"{ctx['field']}{ctx['value']!r}"
)
elif result is not None:
report.fixes_applied.append(
f"{issue.check_id} FAILED: workstream {ws_id[:8]}"
f"{ctx['field']}{ctx['value']!r}: {result['_error']}"
)
elif issue.check_id == "C-23":
ws_id = ctx["ws_id"]
target_status = ctx["target_status"]
result = _api_patch(api_base, f"/workstreams/{ws_id}", {"status": target_status})
if result is not None and "_error" not in result:
report.fixes_applied.append(
f"C-23 fixed: workstream {ws_id[:8]}… status → {target_status!r}"
)
elif result is not None:
report.fixes_applied.append(
f"C-23 FAILED: workstream {ws_id[:8]}"
f"status → {target_status!r}: {result['_error']}"
)
if no_writeback:
report.fixes_applied.append(
f"C-23 skipped file repair (--no-writeback): {Path(ctx['wp_file']).name}"
)
else:
wp_file = Path(ctx["wp_file"])
old_status = ctx["file_status"]
if _patch_frontmatter_field(wp_file, "status", target_status):
committed = _git_commit_writeback(
repo_path,
wp_file,
[f"workplan status: {old_status}{target_status}"],
subject="chore(consistency): renormalize lifecycle state [auto]",
)
suffix = " (committed)" if committed else " (file patched, commit failed)"
report.fixes_applied.append(
f"C-23 fixed: {wp_file.name} status "
f"{old_status}{target_status}{suffix}"
)
else:
report.fixes_applied.append(
f"C-23 SKIP: {wp_file.name} already has status {target_status!r}"
)
elif issue.check_id == "C-06":
wp_file = Path(ctx["wp_file"])
meta = ctx["meta"]
domain = ctx["domain"]
repo_id_val = ctx["repo_id"]
body = ctx.get("body", "")
wp_id = str(meta.get("id", "")).strip()
title = str(meta.get("title", "")).strip()
status = str(meta.get("status", "active")).strip()
status = normalise_workstream_status(status)
if status not in VALID_WP_STATUSES:
status = "active"
# Find topic_id for this domain
topics = _api_get(api_base, "/topics")
topic_id = None
if isinstance(topics, list):
for t in topics:
if t.get("domain_slug") == domain:
topic_id = t["id"]
break
if topic_id is None:
report.fixes_applied.append(
f"C-06 SKIP {wp_id}: no topic found for domain '{domain}'"
)
continue
slug = re.sub(r"[^a-z0-9-]", "-", wp_id.lower()).strip("-")
ws_data = _api_post(api_base, "/workstreams", {
"topic_id": topic_id,
"repo_id": repo_id_val,
"slug": slug,
"title": title or wp_id,
"status": status,
"owner": str(meta.get("owner", "")).strip() or None,
"planning_priority": str(meta.get("planning_priority", "")).strip() or None,
"planning_order": _as_int_or_none(meta.get("planning_order")),
})
if ws_data is None:
report.fixes_applied.append(
f"C-06 FAIL {wp_id}: could not create workstream in DB"
)
continue
new_ws_id = ws_data["id"]
_add_frontmatter_field(wp_file, "state_hub_workstream_id", new_ws_id)
report.fixes_applied.append(
f"C-06 fixed: created workstream {new_ws_id[:8]}"
f"for {wp_id}, wrote ID to {wp_file.name}"
)
# Create tasks and inject IDs
tasks = get_tasks_from_workplan(meta, body)
for task in tasks:
if task.get("_parse_error"):
continue
t_id = str(task.get("id", "")).strip()
if not t_id:
continue
t_status = normalise_task_status(task.get("status", "todo"))
t_priority = str(task.get("priority", "medium")).strip()
if t_priority not in VALID_TASK_PRIORITIES:
t_priority = "medium"
t_data = _api_post(api_base, "/tasks", {
"workstream_id": new_ws_id,
"title": str(task.get("title", t_id)).strip() or t_id,
"description": task.get("description") or None,
"status": t_status,
"priority": t_priority,
"assignee": task.get("assignee") or None,
})
if t_data:
t_db_id = t_data["id"]
injected = _inject_task_id_into_block(
wp_file, "state_hub_task_id", t_db_id, t_id
)
if not injected:
_inject_task_id_frontmatter_list(wp_file, t_db_id, t_id)
report.fixes_applied.append(f" + task {t_id}{t_db_id[:8]}")
elif issue.check_id == "C-09":
ws_id = ctx["ws_id"]
correct_repo_id = ctx["correct_repo_id"]
result = _api_patch(api_base, f"/workstreams/{ws_id}",
{"repo_id": correct_repo_id})
if result is not None:
report.fixes_applied.append(
f"C-09 fixed: workstream {ws_id[:8]}"
f"repo_id → {correct_repo_id[:8]}"
)
elif issue.check_id == "C-20":
from_workstream_id = ctx["from_workstream_id"]
body = {
"to_workstream_id": ctx.get("to_workstream_id"),
"to_task_id": ctx.get("to_task_id"),
"relationship_type": ctx["relationship_type"],
}
result = _api_post(api_base, f"/workstreams/{from_workstream_id}/dependencies", body)
if result is not None and "_error" not in result:
target = ctx.get("to_workstream_id") or ctx.get("to_task_id")
report.fixes_applied.append(
f"C-20 fixed: dependency {from_workstream_id[:8]}"
f"{ctx['relationship_type']}{target[:8]}"
)
elif result is not None:
report.fixes_applied.append(
f"C-20 FAILED: {result['_error']}"
)
elif issue.check_id == "C-10":
task_id = ctx["task_id"]
status = ctx["status"]
result = _api_patch(api_base, f"/tasks/{task_id}",
{"status": status, "suppress_token_event": True})
if result is not None and "_error" not in result:
report.fixes_applied.append(
f"C-10 fixed: task {task_id[:8]}… status → {status!r}"
)
elif result is not None:
report.fixes_applied.append(
f"C-10 FAILED: task {task_id[:8]}… status → {status!r}: {result['_error']}"
)
elif issue.check_id == "C-11":
ws_id = ctx["ws_id"]
ws_status = ctx.get("ws_status", "")
task = ctx["task"]
wp_file = Path(ctx["wp_file"])
meta = ctx["meta"]
body = ctx.get("body", "")
t_id = str(task.get("id", "")).strip()
# Skip creating tasks for finished workstreams — the workstream is
# done/archived so unlinked tasks are stale file artefacts, not gaps.
if normalise_workstream_status(ws_status) in CLOSED_WORKSTREAM_STATUSES:
report.fixes_applied.append(
f"C-11 skipped: task '{t_id}' in {ws_status} workstream — not created"
)
else:
t_status = normalise_task_status(task.get("status", "todo"))
t_priority = str(task.get("priority", "medium")).strip()
if t_priority not in VALID_TASK_PRIORITIES:
t_priority = "medium"
t_data = _api_post(api_base, "/tasks", {
"workstream_id": ws_id,
"title": str(task.get("title", t_id)).strip() or t_id,
"description": task.get("description") or None,
"status": t_status,
"priority": t_priority,
"assignee": task.get("assignee") or None,
})
if t_data:
t_db_id = t_data["id"]
injected = _inject_task_id_into_block(
wp_file, "state_hub_task_id", t_db_id, t_id
)
if not injected:
_inject_task_id_frontmatter_list(wp_file, t_db_id, t_id)
report.fixes_applied.append(
f"C-11 fixed: task '{t_id}'{t_db_id[:8]}"
)
elif issue.check_id == "C-12":
task_id = ctx["task_id"]
if ctx.get("ws_finished"):
result = _api_patch(api_base, f"/tasks/{task_id}", {"status": "cancel"})
if result is not None:
report.fixes_applied.append(
f"C-12 fixed: orphan task {task_id[:8]}… canceled (workstream finished)"
)
elif issue.check_id == "C-22":
task_id = ctx["task_id"]
description = ctx["description"]
result = _api_patch(api_base, f"/tasks/{task_id}", {"description": description})
if result is not None and "_error" not in result:
report.fixes_applied.append(
f"C-22 fixed: task {task_id[:8]}… description updated"
)
elif result is not None:
report.fixes_applied.append(
f"C-22 FAILED: task {task_id[:8]}… description update: {result['_error']}"
)
elif issue.check_id == "C-15":
# T03 — writeback: DB is ahead of file — patch file to match DB.
if no_writeback:
report.fixes_applied.append(
f"C-15 skipped (--no-writeback): task '{ctx['task_block_id']}' "
f"db={ctx['db_status']!r}"
)
else:
wp_file = Path(ctx["wp_file"])
task_block_id = ctx["task_block_id"]
db_status = ctx["db_status"]
old_status = issue.file_value
if _patch_task_status_in_file(wp_file, task_block_id, db_status):
committed = _git_commit_writeback(
repo_path,
wp_file,
[f"{task_block_id}: {old_status}{db_status}"],
)
suffix = " (committed)" if committed else " (file patched, commit failed)"
report.fixes_applied.append(
f"C-15 fixed: task '{task_block_id}' "
f"{old_status}{db_status}{suffix}"
)
else:
report.fixes_applied.append(
f"C-15 SKIP: could not locate task block '{task_block_id}' "
f"in {wp_file.name}"
)
except Exception as e:
report.fixes_applied.append(f"{issue.check_id} ERROR: {e}")
# Record that a sync run happened for this repo
from datetime import timezone as _tz
import datetime as _dt
now_iso = _dt.datetime.now(_tz.utc).isoformat()
_api_patch(api_base, f"/repos/{repo_slug}", {"last_state_synced_at": now_iso})
# Write the worker orientation brief (.custodian-brief.md)
if repo_path:
brief_written = _write_custodian_brief(api_base, repo_slug, repo_path)
if brief_written:
report.fixes_applied.append("brief: .custodian-brief.md updated")
# Push all commits made this run (writebacks + brief) to close the loop.
# This ensures the next run starts with local == remote, making the
# timer idempotent and preventing commit pile-up.
if repo_path:
push_ok, push_msg = _git_push(repo_path)
if push_ok:
report.fixes_applied.append(f"push: {push_msg}")
else:
report.fixes_applied.append(f"push WARN: {push_msg}")
return report
# Check IDs that are known-background noise in multi-machine setups:
# C-08 = finished/archived DB workstream with no file (pre-ADR-001 legacy)
# These alone do not warrant a pull+fix cycle.
_BACKGROUND_CHECKS: frozenset[str] = frozenset({"C-08"})
def _report_needs_action(
report: ConsistencyReport, behind_remote: bool, ahead_of_remote: int = 0
) -> bool:
"""Return True if the repo warrants a pull+fix cycle.
A repo is considered clean (no action needed) when:
- It is not behind its remote tracking branch, AND
- It has no unpushed local commits (from a prior failed push), AND
- It has no FAIL issues, AND
- Every WARN/INFO issue is in the background-noise set (C-08).
"""
if behind_remote or ahead_of_remote > 0:
return True
if report.failures:
return True
actionable_warns = [
i for i in report.warnings + report.infos
if i.check_id not in _BACKGROUND_CHECKS
]
return bool(actionable_warns)
def fix_all_remote(
api_base: str,
no_writeback: bool = False,
max_seconds: int = DEFAULT_REMOTE_ALL_MAX_SECONDS,
) -> list[ConsistencyReport]:
"""Pull-then-fix all registered repos that need attention.
For each repo with a resolvable local path on this machine:
1. Skip if the path does not exist (repo lives on another machine).
2. Run check_repo() and detect_behind_remote() — read-only.
3. If the repo is clean (no actionable issues, not behind remote): skip.
4. Otherwise: git pull --ff-only, then fix_repo().
Returns one ConsistencyReport per repo that was *not* skipped as clean.
Repos skipped as clean are printed to stdout but not included in the return value.
"""
repos = _api_get(api_base, "/repos")
if not isinstance(repos, list):
print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr)
return []
started = time.monotonic()
reports: list[ConsistencyReport] = []
skipped_clean: list[str] = []
skipped_missing: list[str] = []
skipped_budget: list[str] = []
for repo in repos:
slug = repo["slug"]
if max_seconds > 0 and time.monotonic() - started > max_seconds:
skipped_budget.append(slug)
skipped_budget.extend(r.get("slug", "?") for r in repos[repos.index(repo) + 1:])
break
# Resolve path using the same priority as check_repo
path = resolve_repo_path(repo)
if not path or not Path(path).is_dir():
skipped_missing.append(slug)
continue
# Read-only pass: detect issues and remote staleness.
# _detect_behind_remote does a fetch, so _detect_ahead_of_remote
# after it sees an up-to-date tracking branch.
pre_report = check_repo(api_base, slug)
behind = _detect_behind_remote(path)
ahead = _detect_ahead_of_remote(path)
if not _report_needs_action(pre_report, behind, ahead):
skipped_clean.append(slug)
continue
# Pull before fixing
pull_ok, pull_msg = _git_pull(path)
pull_tag = f"pull: {pull_msg}"
if not pull_ok:
pull_tag = f"pull WARN: {pull_msg} — proceeding anyway"
report = fix_repo(api_base, slug, no_writeback=no_writeback)
report.fixes_applied.insert(0, pull_tag)
reports.append(report)
# Print clean/missing summary lines before the detailed reports
if skipped_clean:
print(f" CLEAN (skipped): {', '.join(skipped_clean)}")
if skipped_missing:
print(f" NOT ON THIS HOST (skipped): {', '.join(skipped_missing)}")
if skipped_budget:
print(
f" BUDGET EXHAUSTED after {max_seconds}s (skipped): "
f"{', '.join(skipped_budget)}"
)
if skipped_clean or skipped_missing or skipped_budget:
print()
return reports
def archive_closed_workplans(
repo_path: str,
completion_date: str | None = None,
workplan: str | None = None,
) -> list[str]:
"""Move closed root workplans into workplans/archived/ with YYMMDD prefix.
Only root-level files whose frontmatter status normalises to finished or
archived are moved. Files with any open task blocks are left in place.
"""
repo_dir = Path(repo_path)
workplans_dir = repo_dir / "workplans"
archived_dir = workplans_dir / "archived"
if not workplans_dir.is_dir():
return []
date_prefix = completion_date or datetime.now().strftime("%y%m%d")
archived_dir.mkdir(exist_ok=True)
moved: list[str] = []
for wp_file in sorted(workplans_dir.glob("*.md")):
text = wp_file.read_text(encoding="utf-8")
if not text.startswith("---"):
continue
meta, body = parse_frontmatter(text)
if not meta or meta.get("_parse_error"):
continue
if workplan:
wanted = workplan.removesuffix(".md")
if wanted not in {str(meta.get("id", "")), wp_file.stem, wp_file.name}:
continue
status = normalise_workstream_status(str(meta.get("status", "")).strip())
if status not in CLOSED_WORKSTREAM_STATUSES:
continue
tasks = get_tasks_from_workplan(meta, body)
open_tasks = [
t for t in tasks
if normalise_task_status(t.get("status", "todo")) not in TERMINAL_TASK_STATUSES
]
if open_tasks:
continue
target = archived_dir / f"{date_prefix}-{canonical_workplan_filename(wp_file)}"
if target.exists():
raise FileExistsError(f"Archived workplan already exists: {target}")
wp_file.rename(target)
moved.append(f"{wp_file.relative_to(repo_dir)} -> {target.relative_to(repo_dir)}")
return moved
# ---------------------------------------------------------------------------
# Output / rendering
# ---------------------------------------------------------------------------
def render_renormalization_guide() -> str:
lines = ["Lifecycle Renormalization Rules", "=" * 33]
for rule in RENORMALIZATION_RULES:
lines.extend(
[
"",
f"{rule.check_id}: {rule.invariant}",
f" Detect: {rule.detection}",
f" Repair: {rule.repair}",
f" Tests: {rule.test_anchor}",
]
)
lines.extend(["", "Add The Next Guard"])
for idx, item in enumerate(RENORMALIZATION_NEXT_GUARD_CHECKLIST, 1):
lines.append(f" {idx}. {item}")
return "\n".join(lines)
def render_text(report: ConsistencyReport, show_info: bool = True) -> str:
SEP = "=" * 66
lines = [
f"ADR-001 Consistency Report",
f"Repo: {report.repo_slug}",
f"Path: {report.repo_path}",
SEP,
]
for sev in ("FAIL", "WARN", "INFO"):
section = [i for i in report.issues if i.severity == sev]
if not section or (sev == "INFO" and not show_info):
continue
lines.append(f"\n {sev}S ({len(section)}):")
for i in section:
loc = f" [{i.file_path}]" if i.file_path else ""
fix_tag = " [fixable]" if i.fixable else ""
lines.append(f" {i.check_id}{loc}{fix_tag}")
lines.append(f" {i.message}")
if i.file_value or i.db_value:
lines.append(f" file={i.file_value!r} db={i.db_value!r}")
if report.fixes_applied:
lines.append(f"\n FIXES APPLIED ({len(report.fixes_applied)}):")
for f in report.fixes_applied:
lines.append(f" {f}")
lines.append(f"\n{SEP}")
n_fail = len(report.failures)
n_warn = len(report.warnings)
n_info = len(report.infos)
lines.append(f" {n_fail} fail | {n_warn} warn | {n_info} info")
if n_fail:
lines.append(" RESULT: ✗ FAIL")
elif n_warn:
lines.append(" RESULT: ✓ PASS (with warnings)")
else:
lines.append(" RESULT: ✓ PASS")
lines.append(SEP)
return "\n".join(lines)
def report_to_dict(report: ConsistencyReport) -> dict:
return {
"repo_slug": report.repo_slug,
"repo_path": report.repo_path,
"issues": [
{
"severity": i.severity,
"check_id": i.check_id,
"message": i.message,
"file_path": i.file_path,
"db_id": i.db_id,
"file_value": i.file_value,
"db_value": i.db_value,
"fixable": i.fixable,
}
for i in report.issues
],
"fixes_applied": report.fixes_applied,
"summary": {
"fail": len(report.failures),
"warn": len(report.warnings),
"info": len(report.infos),
},
"result": (
"fail" if report.failures else
"warn" if report.warnings else
"pass"
),
}
def consistency_exit_code(reports: list[ConsistencyReport], *, remote_all: bool = False) -> int:
"""Return the strict CLI exit code for consistency reports."""
any_fail = any(r.failures for r in reports)
any_warn = any(r.warnings for r in reports)
if remote_all and not any_fail:
return 0
return 1 if any_fail else 2 if any_warn else 0
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="ADR-001 consistency checker — bidirectional file↔DB validation",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--repo", metavar="SLUG",
help="Registered repo slug (e.g. the-custodian)")
group.add_argument("--all", action="store_true",
help="Run checks against all repos with a resolvable path on this host")
group.add_argument("--here", metavar="PATH", nargs="?", const="",
help="Infer repo slug from git remote URL at PATH (default: CWD)")
group.add_argument("--renormalization-guide", action="store_true",
help="Print lifecycle renormalization rules and the add-next-guard checklist")
parser.add_argument("--fix", action="store_true",
help="Apply auto-fixable issues (status drift, repo mismatch, etc.)")
parser.add_argument("--remote", action="store_true",
help="Pull each repo before fixing; when used with --all, skips repos "
"that are already clean (no actionable issues, not behind remote). "
"Implies --fix.")
parser.add_argument("--max-seconds", type=int, default=DEFAULT_REMOTE_ALL_MAX_SECONDS,
help="Wall-clock budget for --remote --all before remaining repos are skipped "
f"(default: {DEFAULT_REMOTE_ALL_MAX_SECONDS}; 0 disables)")
parser.add_argument("--no-writeback", action="store_true", dest="no_writeback",
help="Disable DB→file status writeback (C-15) while keeping other fixes")
parser.add_argument("--archive-closed", action="store_true",
help="Move closed root workplans to workplans/archived/YYMMDD-*.md")
parser.add_argument("--archive-workplan", metavar="ID_OR_FILE", default=None,
help="When archiving, only move the matching workplan id or filename")
parser.add_argument("--archive-date", metavar="YYMMDD", default=None,
help="Completion date prefix for --archive-closed (default: today)")
parser.add_argument("--repo-path", metavar="PATH", default=None,
help="Override the local repo path (useful when the DB has a different "
"machine's path). Takes priority over host_paths and local_path.")
parser.add_argument("--api-base", default="http://127.0.0.1:8000",
help="State Hub API base URL")
parser.add_argument("--json", action="store_true", dest="as_json",
help="Output JSON instead of human-readable text")
args = parser.parse_args()
if args.renormalization_guide:
if args.as_json:
print(json.dumps({
"rules": [
{
"check_id": rule.check_id,
"invariant": rule.invariant,
"detection": rule.detection,
"repair": rule.repair,
"test_anchor": rule.test_anchor,
}
for rule in RENORMALIZATION_RULES
],
"add_next_guard": list(RENORMALIZATION_NEXT_GUARD_CHECKLIST),
}, indent=2))
else:
print(render_renormalization_guide())
sys.exit(0)
import os as _os
no_wb = getattr(args, "no_writeback", False)
do_fix = args.fix or args.remote
# --here: infer slug from git remote URL, then run as single-repo check/fix
if args.here is not None:
search_path = args.here or _os.getcwd()
inferred = _infer_slug_from_path(args.api_base, search_path)
if inferred is None:
print(
f"ERROR: No registered repo with a matching remote_url found at '{search_path}'.",
file=sys.stderr,
)
print(
" Register the repo first: POST /repos/ with remote_url set.",
file=sys.stderr,
)
sys.exit(1)
inferred_slug, git_root = inferred
print(f" Detected: {inferred_slug} ({git_root})")
if do_fix:
reports = [fix_repo(args.api_base, inferred_slug, git_root, no_writeback=no_wb)]
else:
reports = [check_repo(args.api_base, inferred_slug, git_root)]
if args.archive_closed:
moved = archive_closed_workplans(git_root, args.archive_date, args.archive_workplan)
reports[0].fixes_applied.extend(f"archive: {m}" for m in moved)
# --remote --all: smart pull+fix across all repos
elif args.remote and args.all:
with run_lock("consistency-remote-all") as acquired:
if not acquired:
print(
"SKIP: another fix-consistency-remote --all run is already active",
file=sys.stderr,
)
sys.exit(0)
reports = fix_all_remote(
args.api_base,
no_writeback=no_wb,
max_seconds=args.max_seconds,
)
if not reports:
sys.exit(0)
else:
# Resolve repo list
hostname = socket.gethostname()
repo_slugs: list[str] = []
if args.all:
repos = _api_get(args.api_base, "/repos")
if not isinstance(repos, list):
print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr)
sys.exit(1)
repo_slugs = [
r["slug"] for r in repos
if r.get("local_path") or (r.get("host_paths") or {}).get(hostname)
]
if not repo_slugs:
print(
f"No repos with a path registered for host '{hostname}'.",
file=sys.stderr,
)
sys.exit(0)
else:
repo_slugs = [args.repo]
# --repo-path only applies to single-repo runs; silently ignored with --all
path_override = args.repo_path if not args.all else None
if args.remote:
# Single-repo remote: pull first, then fix
slug = repo_slugs[0]
repo = _api_get(args.api_base, f"/repos/{slug}")
path = resolve_repo_path(repo or {}, path_override) if repo else (path_override or "")
if path:
pull_ok, pull_msg = _git_pull(path)
prefix = "pull" if pull_ok else "pull WARN"
print(f" {prefix}: {pull_msg}")
report = fix_repo(args.api_base, slug, path_override, no_writeback=no_wb)
reports = [report]
elif do_fix:
reports = [
fix_repo(args.api_base, slug, path_override, no_writeback=no_wb)
for slug in repo_slugs
]
else:
reports = [check_repo(args.api_base, slug, path_override) for slug in repo_slugs]
if args.archive_closed:
for report in reports:
moved = archive_closed_workplans(report.repo_path, args.archive_date, args.archive_workplan)
report.fixes_applied.extend(f"archive: {m}" for m in moved)
if args.as_json:
output = (
report_to_dict(reports[0])
if len(reports) == 1
else [report_to_dict(r) for r in reports]
)
print(json.dumps(output, indent=2))
else:
for report in reports:
print(render_text(report))
print()
sys.exit(consistency_exit_code(reports, remote_all=args.remote and args.all))
if __name__ == "__main__":
main()