Files
the-custodian/state-hub/scripts/consistency_check.py
tegwick 2c8561be70 feat(repos): multi-machine path support via host_paths
Adds a JSONB column `host_paths` to managed_repos mapping
hostname → absolute local path. Fixes the consistency-checker
failure when the same repo lives at different paths on different
machines (e.g. /home/worsch/marki-docx on the workstation vs
/home/tegwick/marki-docx on custodiancore).

Changes:
- Migration g4b5c6d7e8f9: adds host_paths JSONB (default {})
- Model: host_paths Mapped[dict] column
- Schemas: host_paths in RepoRead; new RepoPathRegister schema
- Router: POST /repos/{slug}/paths/ — merges one host entry
- consistency_check.py: resolve_repo_path() prefers host_paths
  [hostname] over local_path; --repo-path CLI override added
- MCP: update_repo_path(slug, path, host?) tool
- Makefile: register-path target; REPO_PATH passthrough on
  check-consistency and fix-consistency targets
- TOOLS.md: documents update_repo_path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-16 16:30:55 +01:00

932 lines
36 KiB
Python

#!/usr/bin/env python3
"""consistency_check.py — ADR-001 consistency checking engine.
Runs bidirectional checks between workplan files in a registered repo and the
state-hub DB. The file is always authoritative; the DB is the cache/index layer.
Checks:
C-01 workplans-dir FAIL No workplans/ directory missing
C-02 workplan-parse FAIL No Workplan file cannot be parsed
C-03 workstream-stale-ref FAIL No state_hub_workstream_id in file not in DB
C-04 workstream-status-drift WARN Yes File status != DB status (file wins)
C-05 workstream-title-drift WARN Yes File title != DB title (file wins)
C-06 workstream-unlinked WARN Yes Workplan has no state_hub_workstream_id
C-07 orphan-db-active FAIL No Active DB workstream, no backing file
C-08 orphan-db-completed INFO No Completed/archived DB workstream, no file
C-09 workstream-repo-mismatch FAIL Yes DB workstream repo_id != file location
C-10 task-status-drift WARN Yes Task status differs between file and DB
C-11 task-unlinked WARN Yes Task block has no state_hub_task_id
C-12 orphan-db-task WARN No DB task in workstream has no file backing
C-13 workstream-auto-complete WARN Yes All DB tasks done but workstream still active
Usage:
python scripts/consistency_check.py --repo SLUG [--fix] [--json] [--api-base URL]
python scripts/consistency_check.py --all [--fix] [--json] [--api-base URL]
Exit codes:
0 — ok (no FAILs; only WARNs/INFOs)
1 — one or more FAILs present
2 — warn-only (no FAILs, but WARNs present)
"""
from __future__ import annotations
import argparse
import json
import re
import socket
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
try:
import yaml as _yaml
_HAS_YAML = True
except ImportError:
_HAS_YAML = False
try:
import httpx as _httpx
_HAS_HTTPX = True
except ImportError:
_HAS_HTTPX = False
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL)
VALID_WP_STATUSES = {"active", "completed", "archived"}
VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"}
VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"}
# Workplan files use task-style vocabulary ("done"); the DB workstream API uses
# "completed". This map translates file values to DB values before comparison
# and before PATCHing, so "done" vs "completed" is never flagged as C-04 drift.
FILE_TO_DB_WORKSTREAM_STATUS: dict[str, str] = {
"done": "completed",
}
def normalise_workstream_status(status: str) -> str:
"""Translate a workplan file status value to its DB-canonical equivalent."""
return FILE_TO_DB_WORKSTREAM_STATUS.get(status, status)
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
@dataclass
class Issue:
severity: str # FAIL | WARN | INFO
check_id: str # C-01 through C-12
message: str
file_path: str = ""
db_id: str = ""
file_value: str = ""
db_value: str = ""
fixable: bool = False
_fix_context: dict = field(default_factory=dict, repr=False)
@dataclass
class ConsistencyReport:
repo_slug: str
repo_path: str
issues: list[Issue] = field(default_factory=list)
fixes_applied: list[str] = field(default_factory=list)
def add(self, **kwargs) -> Issue:
issue = Issue(**kwargs)
self.issues.append(issue)
return issue
@property
def failures(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "FAIL"]
@property
def warnings(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "WARN"]
@property
def infos(self) -> list[Issue]:
return [i for i in self.issues if i.severity == "INFO"]
# ---------------------------------------------------------------------------
# YAML / frontmatter parsing
# ---------------------------------------------------------------------------
def _parse_yaml_block(raw: str) -> dict:
"""Parse a YAML string; fallback to simple key:value if pyyaml unavailable."""
if _HAS_YAML:
try:
return _yaml.safe_load(raw) or {}
except _yaml.YAMLError:
return {"_parse_error": True}
result: dict = {}
for line in raw.splitlines():
if ":" in line and not line.startswith(" "):
k, _, v = line.partition(":")
result[k.strip()] = v.strip().strip('"').strip("'")
return result
def parse_frontmatter(text: str) -> tuple[dict, str]:
"""Split YAML frontmatter from body. Returns ({}, text) if no frontmatter."""
if not text.startswith("---"):
return {}, text
parts = text.split("---", 2)
if len(parts) < 3:
return {}, text
meta = _parse_yaml_block(parts[1].strip())
return meta, parts[2]
def parse_task_blocks(body: str) -> list[dict]:
"""Extract all ```task ... ``` YAML blocks from a workplan body."""
return [_parse_yaml_block(m.group(1).strip()) for m in _TASK_BLOCK_RE.finditer(body)]
def get_tasks_from_workplan(meta: dict, body: str) -> list[dict]:
"""Get tasks from workplan — handles both ```task``` blocks and tasks: YAML list."""
blocks = parse_task_blocks(body)
if blocks:
return blocks
# Fallback: tasks embedded as a YAML list in frontmatter (activity-core style)
tasks = meta.get("tasks")
if isinstance(tasks, list):
return tasks
return []
# ---------------------------------------------------------------------------
# File update helpers
# ---------------------------------------------------------------------------
def _add_frontmatter_field(file_path: Path, key: str, value: str) -> None:
"""Insert key: "value" into frontmatter before the closing --- line."""
text = file_path.read_text(encoding="utf-8")
lines = text.split("\n")
close_idx = None
for i, line in enumerate(lines[1:], 1):
if line.strip() == "---":
close_idx = i
break
if close_idx is None:
return
lines.insert(close_idx, f'{key}: "{value}"')
file_path.write_text("\n".join(lines), encoding="utf-8")
def _inject_task_id_into_block(
file_path: Path, field_name: str, field_value: str, match_id: str
) -> bool:
"""Inject state_hub_task_id into the ```task``` block whose id == match_id."""
text = file_path.read_text(encoding="utf-8")
def _replace(m: re.Match) -> str:
block_content = m.group(1)
task_meta = _parse_yaml_block(block_content.strip())
if str(task_meta.get("id", "")) != match_id:
return m.group(0)
existing_val = task_meta.get(field_name)
if existing_val is not None and str(existing_val).strip() not in ("", "~", "null", "None", "none"):
return m.group(0)
# Replace existing null/~ line if present, otherwise append
new_content = re.sub(
rf"^{re.escape(field_name)}:.*$",
f'{field_name}: "{field_value}"',
block_content,
flags=re.MULTILINE,
)
if new_content == block_content:
new_content = block_content.rstrip() + f"\n{field_name}: \"{field_value}\""
return f"```task\n{new_content}\n```"
new_text = _TASK_BLOCK_RE.sub(_replace, text)
if new_text != text:
file_path.write_text(new_text, encoding="utf-8")
return True
return False
def _inject_task_id_frontmatter_list(
file_path: Path, field_value: str, match_id: str
) -> bool:
"""Inject state_hub_task_id into a task entry in frontmatter tasks: list."""
if not _HAS_YAML:
return False
import yaml
text = file_path.read_text(encoding="utf-8")
meta, body = parse_frontmatter(text)
tasks = meta.get("tasks", [])
changed = False
for t in tasks:
if str(t.get("id", "")) == match_id and "state_hub_task_id" not in t:
t["state_hub_task_id"] = field_value
changed = True
if not changed:
return False
try:
new_meta_str = yaml.dump(meta, allow_unicode=True, default_flow_style=False)
parts = text.split("---", 2)
if len(parts) < 3:
return False
new_text = f"---\n{new_meta_str}---{parts[2]}"
file_path.write_text(new_text, encoding="utf-8")
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def _api_get(api_base: str, path: str, params: dict | None = None) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None})
r.raise_for_status()
return r.json()
except Exception:
return None
def _api_patch(api_base: str, path: str, body: dict) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.patch(path, json=body)
r.raise_for_status()
return r.json()
except Exception as exc:
# Return a sentinel dict so callers can distinguish "API error" from "success"
# and report it rather than silently dropping the fix.
return {"_error": str(exc)}
def _api_post(api_base: str, path: str, body: dict) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.post(path, json=body)
r.raise_for_status()
return r.json()
except Exception:
return None
# ---------------------------------------------------------------------------
# Core check engine
# ---------------------------------------------------------------------------
def resolve_repo_path(repo: dict, override: str | None = None) -> str:
"""Resolve the local filesystem path for a repo on the current machine.
Priority:
1. Explicit --repo-path CLI override
2. host_paths[current_hostname] — per-machine path registered via POST /repos/{slug}/paths/
3. local_path — legacy single-path field (backward compat)
"""
if override:
return override
hostname = socket.gethostname()
host_paths = repo.get("host_paths") or {}
return host_paths.get(hostname) or repo.get("local_path") or ""
def check_repo(api_base: str, repo_slug: str, repo_path_override: str | None = None) -> ConsistencyReport:
"""Run all consistency checks for a registered repo."""
repo = _api_get(api_base, f"/repos/{repo_slug}")
if repo is None:
report = ConsistencyReport(repo_slug=repo_slug, repo_path="(unknown)")
report.add(
severity="FAIL", check_id="C-00",
message=f"Repo '{repo_slug}' not found in state-hub DB",
fixable=False,
)
return report
repo_id: str = repo["id"]
repo_path: str = resolve_repo_path(repo, repo_path_override)
report = ConsistencyReport(repo_slug=repo_slug, repo_path=repo_path)
if not repo_path:
report.add(
severity="FAIL", check_id="C-00",
message=f"Repo '{repo_slug}' has no local_path — cannot check workplan files",
fixable=False,
)
_check_orphan_db(api_base, repo_id, set(), report)
return report
repo_dir = Path(repo_path)
workplans_dir = repo_dir / "workplans"
# C-01: workplans/ directory missing
if not workplans_dir.is_dir():
report.add(
severity="FAIL", check_id="C-01",
message=(
"workplans/ directory missing — ADR-001 requires workplan files "
"at <repo>/workplans/<ID>-<slug>.md"
),
fixable=False,
)
_check_orphan_db(api_base, repo_id, set(), report)
return report
# Parse workplan files
workplan_infos: list[tuple[Path, dict, str]] = []
file_ws_ids: dict[str, tuple[Path, dict, str]] = {} # ws_id → (file, meta, body)
for wp_file in sorted(workplans_dir.glob("*.md")):
try:
text = wp_file.read_text(encoding="utf-8")
except OSError as e:
report.add(severity="FAIL", check_id="C-02",
message=f"Cannot read file: {e}", file_path=wp_file.name)
continue
if not text.startswith("---"):
report.add(severity="FAIL", check_id="C-02",
message="No YAML frontmatter found", file_path=wp_file.name)
continue
meta, body = parse_frontmatter(text)
if not meta or meta.get("_parse_error"):
report.add(severity="FAIL", check_id="C-02",
message="YAML frontmatter parse error", file_path=wp_file.name)
continue
workplan_infos.append((wp_file, meta, body))
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
if ws_id:
file_ws_ids[ws_id] = (wp_file, meta, body)
# Per-workplan checks
for wp_file, meta, body in workplan_infos:
fname = wp_file.name
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
file_status = str(meta.get("status", "")).strip()
file_title = str(meta.get("title", "")).strip()
file_domain = str(meta.get("domain", "")).strip()
if not ws_id:
# C-06: workplan not linked to any DB workstream
report.add(
severity="WARN", check_id="C-06",
message="Workplan has no state_hub_workstream_id — not indexed in DB",
file_path=fname,
file_value=file_title or fname,
fixable=True,
_fix_context={
"wp_file": str(wp_file),
"meta": meta,
"body": body,
"repo_id": repo_id,
"domain": file_domain,
},
)
continue
ws = _api_get(api_base, f"/workstreams/{ws_id}")
if ws is None:
# C-03: stale workstream reference
report.add(
severity="FAIL", check_id="C-03",
message=f"state_hub_workstream_id {ws_id[:8]}… not found in DB (stale reference)",
file_path=fname,
db_id=ws_id,
fixable=False,
)
continue
# C-09: repo mismatch — file is here but DB says different repo
db_repo_id: str = ws.get("repo_id") or ""
if db_repo_id != repo_id:
report.add(
severity="FAIL", check_id="C-09",
message=(
f"Workstream '{ws.get('slug')}' repo_id in DB is "
f"{db_repo_id[:8] if db_repo_id else 'None'} "
f"but backing file lives in {repo_slug} ({repo_id[:8]}…)"
),
file_path=fname,
db_id=ws_id,
file_value=repo_id,
db_value=db_repo_id,
fixable=True,
_fix_context={"ws_id": ws_id, "correct_repo_id": repo_id},
)
# Continue to check drift even with mismatched repo
# C-04: status drift — normalise file value before comparing so that
# "done" (file) vs "completed" (DB) is not treated as drift.
db_status = ws.get("status", "")
normalised_file_status = normalise_workstream_status(file_status)
if file_status and db_status and normalised_file_status != db_status:
report.add(
severity="WARN", check_id="C-04",
message=(
f"Status drift in '{ws.get('slug')}': "
f"file={file_status!r} db={db_status!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=file_status,
db_value=db_status,
fixable=True,
_fix_context={
"ws_id": ws_id,
"field": "status",
"value": normalised_file_status, # always send DB-valid value
},
)
# C-05: title drift
db_title = ws.get("title", "")
if file_title and db_title and file_title != db_title:
report.add(
severity="WARN", check_id="C-05",
message=(
f"Title drift in '{ws.get('slug')}': "
f"file={file_title!r} db={db_title!r} (file wins)"
),
file_path=fname,
db_id=ws_id,
file_value=file_title,
db_value=db_title,
fixable=True,
_fix_context={"ws_id": ws_id, "field": "title", "value": file_title},
)
# C-10, C-11, C-12: task-level checks
tasks = get_tasks_from_workplan(meta, body)
db_tasks = _api_get(api_base, "/tasks", {"workstream_id": ws_id})
db_task_by_id: dict[str, dict] = {}
if isinstance(db_tasks, list):
for t in db_tasks:
db_task_by_id[t["id"]] = t
file_task_sh_ids: set[str] = set()
for task in tasks:
if task.get("_parse_error"):
continue
t_id = str(task.get("id", "")).strip()
_raw_sh = task.get("state_hub_task_id")
t_sh_id = "" if _raw_sh is None else str(_raw_sh).strip().strip('"')
if t_sh_id in ("~", "null", "None", "none"):
t_sh_id = ""
t_status = str(task.get("status", "")).strip()
if t_sh_id:
file_task_sh_ids.add(t_sh_id)
db_task = _api_get(api_base, f"/tasks/{t_sh_id}")
if db_task is None:
report.add(
severity="FAIL", check_id="C-03",
message=f"state_hub_task_id {t_sh_id[:8]}… not found in DB",
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
fixable=False,
)
continue
# C-10: task status drift
db_t_status = db_task.get("status", "")
if t_status and db_t_status and t_status != db_t_status:
report.add(
severity="WARN", check_id="C-10",
message=(
f"Task status drift '{t_id}': "
f"file={t_status!r} db={db_t_status!r} (file wins)"
),
file_path=f"{fname}#{t_id}",
db_id=t_sh_id,
file_value=t_status,
db_value=db_t_status,
fixable=True,
_fix_context={"task_id": t_sh_id, "status": t_status},
)
elif t_id:
# C-11: task exists in file but not linked to DB
report.add(
severity="WARN", check_id="C-11",
message=f"Task '{t_id}' has no state_hub_task_id",
file_path=f"{fname}#{t_id}",
fixable=True,
_fix_context={
"ws_id": ws_id,
"task": task,
"wp_file": str(wp_file),
"meta": meta,
"body": body,
},
)
# C-12: DB tasks with no file backing
if isinstance(db_tasks, list):
for db_t in db_tasks:
if db_t["id"] not in file_task_sh_ids:
report.add(
severity="WARN", check_id="C-12",
message=(
f"DB task '{db_t.get('title', '')}' "
f"(id={db_t['id'][:8]}…, status={db_t.get('status', '')}) "
f"in workstream '{ws.get('slug')}' has no file backing"
),
db_id=db_t["id"],
fixable=False,
)
# C-13: all DB tasks done but workstream still active — worker forgot to close
db_status = ws.get("status", "")
if db_status == "active" and isinstance(db_tasks, list) and db_tasks:
non_terminal = [
t for t in db_tasks
if t.get("status") not in ("done", "cancelled")
]
if not non_terminal:
report.add(
severity="WARN", check_id="C-13",
message=(
f"All {len(db_tasks)} DB tasks for '{ws.get('slug')}' are "
f"done/cancelled but workstream status is still 'active'"
f"worker likely forgot update_workstream_status()"
),
file_path=fname,
db_id=ws_id,
db_value="active",
fixable=True,
_fix_context={
"ws_id": ws_id,
"field": "status",
"value": "completed",
},
)
# C-07 / C-08: orphan DB workstreams (have repo_id=this_repo but no backing file)
_check_orphan_db(api_base, repo_id, set(file_ws_ids.keys()), report)
return report
def _check_orphan_db(
api_base: str, repo_id: str, file_ws_ids: set[str], report: ConsistencyReport
) -> None:
"""Flag DB workstreams with repo_id=this_repo that have no backing workplan file."""
all_ws = _api_get(api_base, "/workstreams", {"repo_id": repo_id})
if not isinstance(all_ws, list):
return
for ws in all_ws:
ws_id = ws["id"]
if ws_id in file_ws_ids:
continue
ws_status = ws.get("status", "")
ws_slug = ws.get("slug", "")
if ws_status == "active":
report.add(
severity="FAIL", check_id="C-07",
message=(
f"Active DB workstream '{ws_slug}' (id={ws_id[:8]}…) "
f"has no backing workplan file — ADR-001 violation"
),
db_id=ws_id,
fixable=False,
)
elif ws_status in ("completed", "archived"):
report.add(
severity="INFO", check_id="C-08",
message=(
f"Completed/archived DB workstream '{ws_slug}' "
f"(id={ws_id[:8]}…, status={ws_status}) has no backing workplan file"
),
db_id=ws_id,
fixable=False,
)
# ---------------------------------------------------------------------------
# Fix engine
# ---------------------------------------------------------------------------
def fix_repo(api_base: str, repo_slug: str, repo_path_override: str | None = None) -> ConsistencyReport:
"""Run checks then apply all auto-fixable issues. Returns updated report."""
report = check_repo(api_base, repo_slug, repo_path_override)
fixable = [i for i in report.issues if i.fixable]
for issue in fixable:
ctx = issue._fix_context
try:
if issue.check_id in ("C-04", "C-05", "C-13"):
ws_id = ctx["ws_id"]
result = _api_patch(api_base, f"/workstreams/{ws_id}",
{ctx["field"]: ctx["value"]})
if result is not None and "_error" not in result:
report.fixes_applied.append(
f"{issue.check_id} fixed: workstream {ws_id[:8]}"
f"{ctx['field']}{ctx['value']!r}"
)
elif result is not None:
report.fixes_applied.append(
f"{issue.check_id} FAILED: workstream {ws_id[:8]}"
f"{ctx['field']}{ctx['value']!r}: {result['_error']}"
)
elif issue.check_id == "C-06":
wp_file = Path(ctx["wp_file"])
meta = ctx["meta"]
domain = ctx["domain"]
repo_id_val = ctx["repo_id"]
body = ctx.get("body", "")
wp_id = str(meta.get("id", "")).strip()
title = str(meta.get("title", "")).strip()
status = str(meta.get("status", "active")).strip()
if status not in ("active", "completed", "archived"):
status = "active"
# Find topic_id for this domain
topics = _api_get(api_base, "/topics")
topic_id = None
if isinstance(topics, list):
for t in topics:
if t.get("domain_slug") == domain:
topic_id = t["id"]
break
if topic_id is None:
report.fixes_applied.append(
f"C-06 SKIP {wp_id}: no topic found for domain '{domain}'"
)
continue
slug = re.sub(r"[^a-z0-9-]", "-", wp_id.lower()).strip("-")
ws_data = _api_post(api_base, "/workstreams", {
"topic_id": topic_id,
"repo_id": repo_id_val,
"slug": slug,
"title": title or wp_id,
"status": status,
"owner": str(meta.get("owner", "")).strip() or None,
})
if ws_data is None:
report.fixes_applied.append(
f"C-06 FAIL {wp_id}: could not create workstream in DB"
)
continue
new_ws_id = ws_data["id"]
_add_frontmatter_field(wp_file, "state_hub_workstream_id", new_ws_id)
report.fixes_applied.append(
f"C-06 fixed: created workstream {new_ws_id[:8]}"
f"for {wp_id}, wrote ID to {wp_file.name}"
)
# Create tasks and inject IDs
tasks = get_tasks_from_workplan(meta, body)
for task in tasks:
if task.get("_parse_error"):
continue
t_id = str(task.get("id", "")).strip()
if not t_id:
continue
t_status = str(task.get("status", "todo")).strip()
if t_status not in VALID_TASK_STATUSES:
t_status = "todo"
t_priority = str(task.get("priority", "medium")).strip()
if t_priority not in VALID_TASK_PRIORITIES:
t_priority = "medium"
t_data = _api_post(api_base, "/tasks", {
"workstream_id": new_ws_id,
"title": str(task.get("title", t_id)).strip() or t_id,
"status": t_status,
"priority": t_priority,
"assignee": task.get("assignee") or None,
})
if t_data:
t_db_id = t_data["id"]
injected = _inject_task_id_into_block(
wp_file, "state_hub_task_id", t_db_id, t_id
)
if not injected:
_inject_task_id_frontmatter_list(wp_file, t_db_id, t_id)
report.fixes_applied.append(f" + task {t_id}{t_db_id[:8]}")
elif issue.check_id == "C-09":
ws_id = ctx["ws_id"]
correct_repo_id = ctx["correct_repo_id"]
result = _api_patch(api_base, f"/workstreams/{ws_id}",
{"repo_id": correct_repo_id})
if result is not None:
report.fixes_applied.append(
f"C-09 fixed: workstream {ws_id[:8]}"
f"repo_id → {correct_repo_id[:8]}"
)
elif issue.check_id == "C-10":
task_id = ctx["task_id"]
status = ctx["status"]
result = _api_patch(api_base, f"/tasks/{task_id}",
{"status": status})
if result is not None:
report.fixes_applied.append(
f"C-10 fixed: task {task_id[:8]}… status → {status!r}"
)
elif issue.check_id == "C-11":
ws_id = ctx["ws_id"]
task = ctx["task"]
wp_file = Path(ctx["wp_file"])
meta = ctx["meta"]
body = ctx.get("body", "")
t_id = str(task.get("id", "")).strip()
t_status = str(task.get("status", "todo")).strip()
if t_status not in VALID_TASK_STATUSES:
t_status = "todo"
t_priority = str(task.get("priority", "medium")).strip()
if t_priority not in VALID_TASK_PRIORITIES:
t_priority = "medium"
t_data = _api_post(api_base, "/tasks", {
"workstream_id": ws_id,
"title": str(task.get("title", t_id)).strip() or t_id,
"status": t_status,
"priority": t_priority,
"assignee": task.get("assignee") or None,
})
if t_data:
t_db_id = t_data["id"]
injected = _inject_task_id_into_block(
wp_file, "state_hub_task_id", t_db_id, t_id
)
if not injected:
_inject_task_id_frontmatter_list(wp_file, t_db_id, t_id)
report.fixes_applied.append(
f"C-11 fixed: task '{t_id}'{t_db_id[:8]}"
)
except Exception as e:
report.fixes_applied.append(f"{issue.check_id} ERROR: {e}")
# Record that a sync run happened for this repo
from datetime import timezone as _tz
import datetime as _dt
now_iso = _dt.datetime.now(_tz.utc).isoformat()
_api_patch(api_base, f"/repos/{repo_slug}/", {"last_state_synced_at": now_iso})
return report
# ---------------------------------------------------------------------------
# Output / rendering
# ---------------------------------------------------------------------------
def render_text(report: ConsistencyReport, show_info: bool = True) -> str:
SEP = "=" * 66
lines = [
f"ADR-001 Consistency Report",
f"Repo: {report.repo_slug}",
f"Path: {report.repo_path}",
SEP,
]
for sev in ("FAIL", "WARN", "INFO"):
section = [i for i in report.issues if i.severity == sev]
if not section or (sev == "INFO" and not show_info):
continue
lines.append(f"\n {sev}S ({len(section)}):")
for i in section:
loc = f" [{i.file_path}]" if i.file_path else ""
fix_tag = " [fixable]" if i.fixable else ""
lines.append(f" {i.check_id}{loc}{fix_tag}")
lines.append(f" {i.message}")
if i.file_value or i.db_value:
lines.append(f" file={i.file_value!r} db={i.db_value!r}")
if report.fixes_applied:
lines.append(f"\n FIXES APPLIED ({len(report.fixes_applied)}):")
for f in report.fixes_applied:
lines.append(f" {f}")
lines.append(f"\n{SEP}")
n_fail = len(report.failures)
n_warn = len(report.warnings)
n_info = len(report.infos)
lines.append(f" {n_fail} fail | {n_warn} warn | {n_info} info")
if n_fail:
lines.append(" RESULT: ✗ FAIL")
elif n_warn:
lines.append(" RESULT: ✓ PASS (with warnings)")
else:
lines.append(" RESULT: ✓ PASS")
lines.append(SEP)
return "\n".join(lines)
def report_to_dict(report: ConsistencyReport) -> dict:
return {
"repo_slug": report.repo_slug,
"repo_path": report.repo_path,
"issues": [
{
"severity": i.severity,
"check_id": i.check_id,
"message": i.message,
"file_path": i.file_path,
"db_id": i.db_id,
"file_value": i.file_value,
"db_value": i.db_value,
"fixable": i.fixable,
}
for i in report.issues
],
"fixes_applied": report.fixes_applied,
"summary": {
"fail": len(report.failures),
"warn": len(report.warnings),
"info": len(report.infos),
},
"result": (
"fail" if report.failures else
"warn" if report.warnings else
"pass"
),
}
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="ADR-001 consistency checker — bidirectional file↔DB validation",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--repo", metavar="SLUG",
help="Registered repo slug (e.g. the-custodian)")
group.add_argument("--all", action="store_true",
help="Run checks against all registered repos with local_path")
parser.add_argument("--fix", action="store_true",
help="Apply auto-fixable issues (status drift, repo mismatch, etc.)")
parser.add_argument("--repo-path", metavar="PATH", default=None,
help="Override the local repo path (useful when the DB has a different "
"machine's path). Takes priority over host_paths and local_path.")
parser.add_argument("--api-base", default="http://127.0.0.1:8000",
help="State Hub API base URL")
parser.add_argument("--json", action="store_true", dest="as_json",
help="Output JSON instead of human-readable text")
args = parser.parse_args()
# Resolve repo list
repo_slugs: list[str] = []
if args.all:
repos = _api_get(args.api_base, "/repos")
if not isinstance(repos, list):
print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr)
sys.exit(1)
repo_slugs = [r["slug"] for r in repos if r.get("local_path")]
if not repo_slugs:
print("No repos with local_path registered.", file=sys.stderr)
sys.exit(0)
else:
repo_slugs = [args.repo]
runner = fix_repo if args.fix else check_repo
# --repo-path only applies to single-repo runs; silently ignored with --all
path_override = args.repo_path if not args.all else None
reports = [runner(args.api_base, slug, path_override) for slug in repo_slugs]
if args.as_json:
output = (
report_to_dict(reports[0])
if len(reports) == 1
else [report_to_dict(r) for r in reports]
)
print(json.dumps(output, indent=2))
else:
for report in reports:
print(render_text(report))
print()
any_fail = any(r.failures for r in reports)
any_warn = any(r.warnings for r in reports)
sys.exit(1 if any_fail else 2 if any_warn else 0)
if __name__ == "__main__":
main()