generated from coulomb/repo-seed
2350 lines
94 KiB
Python
2350 lines
94 KiB
Python
#!/usr/bin/env python3
|
||
"""consistency_check.py — ADR-001 consistency checking engine.
|
||
|
||
Runs bidirectional checks between workplan files in a registered repo and the
|
||
state-hub DB. The file is always authoritative; the DB is the cache/index layer.
|
||
|
||
Checks:
|
||
C-01 workplans-dir FAIL No workplans/ directory missing
|
||
C-02 workplan-parse FAIL No Workplan file cannot be parsed
|
||
C-03 workstream-stale-ref FAIL No state_hub_workstream_id in file not in DB
|
||
C-04 workstream-status-drift WARN Yes File status != DB status (file wins)
|
||
C-05 workstream-title-drift WARN Yes File title != DB title (file wins)
|
||
C-06 workstream-unlinked WARN Yes Workplan has no state_hub_workstream_id
|
||
C-07 orphan-db-active FAIL No Active DB workstream, no backing file
|
||
C-08 orphan-db-closed INFO No Finished/archived DB workstream, no file
|
||
C-09 workstream-repo-mismatch FAIL Yes DB workstream repo_id != file location
|
||
C-10 task-status-drift WARN Yes Task status differs between file and DB
|
||
C-11 task-unlinked WARN Yes Task block has no state_hub_task_id
|
||
C-12 orphan-db-task WARN No DB task in workstream has no file backing
|
||
C-13 workstream-auto-complete WARN Yes All DB tasks done but workstream still active
|
||
C-14 ghost-duplicate WARN No Active topic workstream with no repo_id matches a file-backed title — probable ghost from premature create_workstream() call
|
||
C-15 task-db-ahead WARN Yes DB task status is ahead of file — regression prevented; writeback syncs file
|
||
C-16 repo-behind-remote WARN No Local repo is behind remote tracking branch — --fix skipped to avoid clobbering remote progress
|
||
C-17 repo-ahead-push-failed WARN No Local repo has unpushed commits and push failed — writes skipped to prevent runaway divergence
|
||
C-19 workstream-planning-drift WARN Yes planning_priority/planning_order differs between file and DB
|
||
C-20 workstream-dependency-missing WARN Yes Workplan dependency frontmatter missing from DB graph
|
||
C-22 task-description-drift WARN Yes Task description/content differs between file and DB
|
||
C-23 workstream-active-task-planning-status WARN Yes Workstream/workplan is planning while a task is progress or wait
|
||
|
||
Usage:
|
||
python scripts/consistency_check.py --repo SLUG [--fix] [--no-writeback] [--json] [--api-base URL]
|
||
python scripts/consistency_check.py --all [--fix] [--no-writeback] [--json] [--api-base URL]
|
||
python scripts/consistency_check.py --here [PATH] [--fix] [--no-writeback] [--json] [--api-base URL]
|
||
|
||
Exit codes:
|
||
0 — clean (no FAILs or WARNs; INFOs are allowed)
|
||
1 — one or more FAILs present
|
||
2 — warnings-only strict CLI result (no FAILs, but WARNs present)
|
||
|
||
Agent/operator Make wrappers normalize exit code 2 to shell success while
|
||
preserving visible warning output. Use the direct script when a machine caller
|
||
needs to distinguish clean from warnings-only.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import os
|
||
import json
|
||
import re
|
||
import socket
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from contextlib import contextmanager
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
_REPO_ROOT = Path(__file__).resolve().parent.parent
|
||
if str(_REPO_ROOT) not in sys.path:
|
||
sys.path.insert(0, str(_REPO_ROOT))
|
||
|
||
from api.workplan_status import ( # noqa: E402
|
||
CANONICAL_WORKSTREAM_STATUSES,
|
||
CLOSED_WORKSTREAM_STATUSES,
|
||
LEGACY_WORKSTREAM_STATUS_ALIASES,
|
||
OPEN_WORKSTREAM_STATUSES,
|
||
SUPPORTED_WORKSTREAM_STATUSES,
|
||
normalize_workstream_status as _normalize_workstream_status,
|
||
ready_review_status,
|
||
)
|
||
from api.services.lifecycle import should_activate_parent_for_active_tasks # noqa: E402
|
||
from api.task_status import ( # noqa: E402
|
||
CANONICAL_TASK_STATUSES,
|
||
OPEN_TASK_STATUSES,
|
||
TASK_STATUS_ORDER,
|
||
TERMINAL_TASK_STATUSES,
|
||
normalize_task_status,
|
||
)
|
||
|
||
try:
|
||
import yaml as _yaml
|
||
_HAS_YAML = True
|
||
except ImportError:
|
||
_HAS_YAML = False
|
||
|
||
try:
|
||
import httpx as _httpx
|
||
_HAS_HTTPX = True
|
||
except ImportError:
|
||
_HAS_HTTPX = False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Constants
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL)
|
||
_HEADING_RE = re.compile(r"^(#{1,4})\s+(.+?)$", re.MULTILINE)
|
||
_ARCHIVED_WP_RE = re.compile(r"^\d{6}-(.+\.md)$")
|
||
VALID_WP_STATUSES = set(CANONICAL_WORKSTREAM_STATUSES)
|
||
SUPPORTED_WP_STATUSES = set(SUPPORTED_WORKSTREAM_STATUSES)
|
||
VALID_TASK_STATUSES = set(CANONICAL_TASK_STATUSES)
|
||
VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"}
|
||
VALID_DEP_RELATIONSHIPS = {"blocks", "starts_after", "informs", "soft_dependency"}
|
||
DEFAULT_REMOTE_ALL_MAX_SECONDS = int(os.environ.get("CONSISTENCY_REMOTE_ALL_MAX_SECONDS", "300"))
|
||
|
||
# Legacy file/API aliases translated before comparison and PATCHing.
|
||
FILE_TO_DB_WORKSTREAM_STATUS: dict[str, str] = dict(LEGACY_WORKSTREAM_STATUS_ALIASES)
|
||
|
||
# Ordinal ranking for task statuses used by the no-regress rule (T01/C-15).
|
||
STATUS_ORDER: dict[str, int] = dict(TASK_STATUS_ORDER)
|
||
|
||
|
||
def normalise_workstream_status(status: str, *, has_started: bool | None = None) -> str:
|
||
"""Translate a workplan file status value to its DB-canonical equivalent."""
|
||
return _normalize_workstream_status(status, has_started=has_started)
|
||
|
||
|
||
def normalise_task_status(status: Any, *, default: str = "todo") -> str:
|
||
try:
|
||
return normalize_task_status(status, default=default)
|
||
except ValueError:
|
||
return default
|
||
|
||
|
||
def canonical_workplan_filename(path: Path) -> str:
|
||
"""Return the workplan filename without an archive completion-date prefix."""
|
||
return _ARCHIVED_WP_RE.sub(r"\1", path.name)
|
||
|
||
|
||
def workplan_display_path(repo_dir: Path, path: Path) -> str:
|
||
"""Stable relative path for reports, including archived/ when applicable."""
|
||
try:
|
||
return str(path.relative_to(repo_dir))
|
||
except ValueError:
|
||
return path.name
|
||
|
||
|
||
def iter_workplan_files(workplans_dir: Path, include_archived: bool = True) -> list[Path]:
|
||
"""Return active root workplans plus archived workplans when requested."""
|
||
files = sorted(workplans_dir.glob("*.md"))
|
||
archived_dir = workplans_dir / "archived"
|
||
if include_archived and archived_dir.is_dir():
|
||
files.extend(sorted(archived_dir.glob("*.md")))
|
||
return files
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Data types
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass
|
||
class Issue:
|
||
severity: str # FAIL | WARN | INFO
|
||
check_id: str # C-01 through C-12
|
||
message: str
|
||
file_path: str = ""
|
||
db_id: str = ""
|
||
file_value: str = ""
|
||
db_value: str = ""
|
||
fixable: bool = False
|
||
_fix_context: dict = field(default_factory=dict, repr=False)
|
||
|
||
|
||
@dataclass
|
||
class ConsistencyReport:
|
||
repo_slug: str
|
||
repo_path: str
|
||
issues: list[Issue] = field(default_factory=list)
|
||
fixes_applied: list[str] = field(default_factory=list)
|
||
|
||
def add(self, **kwargs) -> Issue:
|
||
issue = Issue(**kwargs)
|
||
self.issues.append(issue)
|
||
return issue
|
||
|
||
@property
|
||
def failures(self) -> list[Issue]:
|
||
return [i for i in self.issues if i.severity == "FAIL"]
|
||
|
||
@property
|
||
def warnings(self) -> list[Issue]:
|
||
return [i for i in self.issues if i.severity == "WARN"]
|
||
|
||
@property
|
||
def infos(self) -> list[Issue]:
|
||
return [i for i in self.issues if i.severity == "INFO"]
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class RenormalizationRule:
|
||
check_id: str
|
||
invariant: str
|
||
detection: str
|
||
repair: str
|
||
test_anchor: str
|
||
|
||
|
||
RENORMALIZATION_RULES: tuple[RenormalizationRule, ...] = (
|
||
RenormalizationRule(
|
||
check_id="C-23",
|
||
invariant="Planning-state workplans cannot contain active task work.",
|
||
detection=(
|
||
"Workplan status is proposed, ready, or backlog while a linked "
|
||
"task is progress or wait."
|
||
),
|
||
repair="Patch the DB workstream and workplan frontmatter to status=active.",
|
||
test_anchor="tests/test_consistency_check.py::TestLifecycleRenormalization",
|
||
),
|
||
)
|
||
|
||
|
||
RENORMALIZATION_NEXT_GUARD_CHECKLIST: tuple[str, ...] = (
|
||
"Name the invariant and assign the next C-id.",
|
||
"Add rule metadata to RENORMALIZATION_RULES.",
|
||
"Add detection in check_repo before generic drift rules that could fight it.",
|
||
"Add the repair branch in fix_repo, using file writes only through helpers.",
|
||
"Add detection and repair tests under TestLifecycleRenormalization.",
|
||
)
|
||
|
||
|
||
@contextmanager
|
||
def run_lock(name: str):
|
||
"""Hold a nonblocking process lock for long-running consistency modes."""
|
||
try:
|
||
import fcntl
|
||
except ImportError:
|
||
yield True
|
||
return
|
||
|
||
lock_path = Path(os.environ.get("CONSISTENCY_LOCK_DIR", "/tmp")) / f"custodian-{name}.lock"
|
||
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||
handle = lock_path.open("w", encoding="utf-8")
|
||
try:
|
||
try:
|
||
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||
except BlockingIOError:
|
||
yield False
|
||
return
|
||
handle.seek(0)
|
||
handle.truncate()
|
||
handle.write(f"{os.getpid()} {datetime.utcnow().isoformat()}Z\n")
|
||
handle.flush()
|
||
yield True
|
||
finally:
|
||
try:
|
||
fcntl.flock(handle, fcntl.LOCK_UN)
|
||
except OSError:
|
||
pass
|
||
handle.close()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# YAML / frontmatter parsing
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _parse_yaml_block(raw: str) -> dict:
|
||
"""Parse a YAML string; fallback to simple key:value if pyyaml unavailable."""
|
||
if _HAS_YAML:
|
||
try:
|
||
return _yaml.safe_load(raw) or {}
|
||
except _yaml.YAMLError:
|
||
return {"_parse_error": True}
|
||
result: dict = {}
|
||
for line in raw.splitlines():
|
||
if ":" in line and not line.startswith(" "):
|
||
k, _, v = line.partition(":")
|
||
result[k.strip()] = v.strip().strip('"').strip("'")
|
||
return result
|
||
|
||
|
||
def parse_frontmatter(text: str) -> tuple[dict, str]:
|
||
"""Split YAML frontmatter from body. Returns ({}, text) if no frontmatter."""
|
||
if not text.startswith("---"):
|
||
return {}, text
|
||
parts = text.split("---", 2)
|
||
if len(parts) < 3:
|
||
return {}, text
|
||
meta = _parse_yaml_block(parts[1].strip())
|
||
return meta, parts[2]
|
||
|
||
|
||
def _clean_task_description(raw: str) -> str | None:
|
||
"""Trim section chrome while preserving task markdown content."""
|
||
lines = raw.splitlines()
|
||
while lines and not lines[0].strip():
|
||
lines.pop(0)
|
||
while lines and not lines[-1].strip():
|
||
lines.pop()
|
||
while lines and lines[-1].strip() in {"---", "***", "___"}:
|
||
lines.pop()
|
||
while lines and not lines[-1].strip():
|
||
lines.pop()
|
||
while lines and lines[0].strip() in {"---", "***", "___"}:
|
||
lines.pop(0)
|
||
while lines and not lines[0].strip():
|
||
lines.pop(0)
|
||
text = "\n".join(lines).strip()
|
||
return text or None
|
||
|
||
|
||
def parse_task_blocks(body: str) -> list[dict]:
|
||
"""Extract task blocks, injecting title and markdown content from the task section."""
|
||
headings = [
|
||
(m.start(), len(m.group(1)), m.group(2).strip())
|
||
for m in _HEADING_RE.finditer(body)
|
||
]
|
||
task_matches = list(_TASK_BLOCK_RE.finditer(body))
|
||
results = []
|
||
for idx, m in enumerate(task_matches):
|
||
task = _parse_yaml_block(m.group(1).strip())
|
||
prev = [(pos, level, text) for pos, level, text in headings if pos < m.start()]
|
||
prev_heading = prev[-1] if prev else None
|
||
if "title" not in task:
|
||
if prev_heading:
|
||
task["title"] = prev_heading[2]
|
||
|
||
content_end = len(body)
|
||
if idx + 1 < len(task_matches):
|
||
content_end = min(content_end, task_matches[idx + 1].start())
|
||
if prev_heading:
|
||
current_level = prev_heading[1]
|
||
next_peer_heading = next(
|
||
(
|
||
pos
|
||
for pos, level, _text in headings
|
||
if pos > m.end() and level <= current_level
|
||
),
|
||
None,
|
||
)
|
||
if next_peer_heading is not None:
|
||
content_end = min(content_end, next_peer_heading)
|
||
else:
|
||
next_heading = next((pos for pos, _level, _text in headings if pos > m.end()), None)
|
||
if next_heading is not None:
|
||
content_end = min(content_end, next_heading)
|
||
|
||
description = _clean_task_description(body[m.end():content_end])
|
||
if description:
|
||
task["description"] = description
|
||
results.append(task)
|
||
return results
|
||
|
||
|
||
def get_tasks_from_workplan(meta: dict, body: str) -> list[dict]:
|
||
"""Get tasks from workplan — handles both ```task``` blocks and tasks: YAML list."""
|
||
blocks = parse_task_blocks(body)
|
||
if blocks:
|
||
return blocks
|
||
# Fallback: tasks embedded as a YAML list in frontmatter (activity-core style)
|
||
tasks = meta.get("tasks")
|
||
if isinstance(tasks, list):
|
||
return tasks
|
||
return []
|
||
|
||
|
||
def _as_list(value: Any) -> list[str]:
|
||
if value is None:
|
||
return []
|
||
if isinstance(value, list):
|
||
return [str(item).strip().strip('"') for item in value if str(item).strip()]
|
||
if isinstance(value, str):
|
||
return [item.strip().strip('"') for item in value.split(",") if item.strip()]
|
||
return [str(value).strip().strip('"')]
|
||
|
||
|
||
def _as_int_or_none(value: Any) -> int | None:
|
||
if value in (None, "", "~", "null", "None", "none"):
|
||
return None
|
||
try:
|
||
return int(value)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# File update helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _add_frontmatter_field(file_path: Path, key: str, value: str) -> None:
|
||
"""Insert key: "value" into frontmatter before the closing --- line."""
|
||
text = file_path.read_text(encoding="utf-8")
|
||
lines = text.split("\n")
|
||
close_idx = None
|
||
for i, line in enumerate(lines[1:], 1):
|
||
if line.strip() == "---":
|
||
close_idx = i
|
||
break
|
||
if close_idx is None:
|
||
return
|
||
lines.insert(close_idx, f'{key}: "{value}"')
|
||
file_path.write_text("\n".join(lines), encoding="utf-8")
|
||
|
||
|
||
def _patch_frontmatter_field(file_path: Path, key: str, value: str) -> bool:
|
||
"""Update or insert a scalar frontmatter field without rewriting the file."""
|
||
text = file_path.read_text(encoding="utf-8")
|
||
if not text.startswith("---"):
|
||
return False
|
||
lines = text.split("\n")
|
||
close_idx = None
|
||
for i, line in enumerate(lines[1:], 1):
|
||
if line.strip() == "---":
|
||
close_idx = i
|
||
break
|
||
if close_idx is None:
|
||
return False
|
||
|
||
new_line = f"{key}: {value}"
|
||
for i in range(1, close_idx):
|
||
if re.match(rf"^\s*{re.escape(key)}\s*:", lines[i]):
|
||
if lines[i] == new_line:
|
||
return False
|
||
lines[i] = new_line
|
||
file_path.write_text("\n".join(lines), encoding="utf-8")
|
||
return True
|
||
|
||
lines.insert(close_idx, new_line)
|
||
file_path.write_text("\n".join(lines), encoding="utf-8")
|
||
return True
|
||
|
||
|
||
def _inject_task_id_into_block(
|
||
file_path: Path, field_name: str, field_value: str, match_id: str
|
||
) -> bool:
|
||
"""Inject state_hub_task_id into the ```task``` block whose id == match_id."""
|
||
text = file_path.read_text(encoding="utf-8")
|
||
|
||
def _replace(m: re.Match) -> str:
|
||
block_content = m.group(1)
|
||
task_meta = _parse_yaml_block(block_content.strip())
|
||
if str(task_meta.get("id", "")) != match_id:
|
||
return m.group(0)
|
||
existing_val = task_meta.get(field_name)
|
||
if existing_val is not None and str(existing_val).strip() not in ("", "~", "null", "None", "none"):
|
||
return m.group(0)
|
||
# Replace existing null/~ line if present, otherwise append
|
||
new_content = re.sub(
|
||
rf"^{re.escape(field_name)}:.*$",
|
||
f'{field_name}: "{field_value}"',
|
||
block_content,
|
||
flags=re.MULTILINE,
|
||
)
|
||
if new_content == block_content:
|
||
new_content = block_content.rstrip() + f"\n{field_name}: \"{field_value}\""
|
||
return f"```task\n{new_content}\n```"
|
||
|
||
new_text = _TASK_BLOCK_RE.sub(_replace, text)
|
||
if new_text != text:
|
||
file_path.write_text(new_text, encoding="utf-8")
|
||
return True
|
||
return False
|
||
|
||
|
||
def _inject_task_id_frontmatter_list(
|
||
file_path: Path, field_value: str, match_id: str
|
||
) -> bool:
|
||
"""Inject state_hub_task_id into a task entry in frontmatter tasks: list."""
|
||
if not _HAS_YAML:
|
||
return False
|
||
import yaml
|
||
text = file_path.read_text(encoding="utf-8")
|
||
meta, body = parse_frontmatter(text)
|
||
tasks = meta.get("tasks", [])
|
||
changed = False
|
||
for t in tasks:
|
||
if str(t.get("id", "")) == match_id and "state_hub_task_id" not in t:
|
||
t["state_hub_task_id"] = field_value
|
||
changed = True
|
||
if not changed:
|
||
return False
|
||
try:
|
||
new_meta_str = yaml.dump(meta, allow_unicode=True, default_flow_style=False)
|
||
parts = text.split("---", 2)
|
||
if len(parts) < 3:
|
||
return False
|
||
new_text = f"---\n{new_meta_str}---{parts[2]}"
|
||
file_path.write_text(new_text, encoding="utf-8")
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# API helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _api_get(
|
||
api_base: str,
|
||
path: str,
|
||
params: dict | None = None,
|
||
*,
|
||
return_error: bool = False,
|
||
) -> Any:
|
||
if not _HAS_HTTPX:
|
||
return {"_error": "httpx is not installed"} if return_error else None
|
||
# Only append trailing slash to the path component, not to query strings
|
||
if "?" not in path and not path.endswith("/"):
|
||
path += "/"
|
||
try:
|
||
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
|
||
filtered = {k: v for k, v in (params or {}).items() if v is not None}
|
||
r = c.get(path, params=filtered if filtered else None)
|
||
r.raise_for_status()
|
||
return r.json()
|
||
except _httpx.HTTPStatusError as exc:
|
||
if exc.response.status_code == 404:
|
||
return None
|
||
if return_error:
|
||
return {"_error": str(exc)}
|
||
return None
|
||
except Exception as exc:
|
||
if return_error:
|
||
return {"_error": str(exc)}
|
||
return None
|
||
|
||
|
||
def _api_patch(api_base: str, path: str, body: dict) -> Any:
|
||
if not _HAS_HTTPX:
|
||
return None
|
||
if not path.endswith("/"):
|
||
path += "/"
|
||
try:
|
||
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
|
||
r = c.patch(path, json=body)
|
||
r.raise_for_status()
|
||
return r.json()
|
||
except Exception as exc:
|
||
# Return a sentinel dict so callers can distinguish "API error" from "success"
|
||
# and report it rather than silently dropping the fix.
|
||
return {"_error": str(exc)}
|
||
|
||
|
||
def _api_post(api_base: str, path: str, body: dict) -> Any:
|
||
if not _HAS_HTTPX:
|
||
return None
|
||
if not path.endswith("/"):
|
||
path += "/"
|
||
try:
|
||
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
|
||
r = c.post(path, json=body)
|
||
r.raise_for_status()
|
||
return r.json()
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Core check engine
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def resolve_repo_path(repo: dict, override: str | None = None) -> str:
|
||
"""Resolve the local filesystem path for a repo on the current machine.
|
||
|
||
Priority:
|
||
1. Explicit --repo-path CLI override
|
||
2. host_paths[current_hostname] — per-machine path registered via POST /repos/{slug}/paths/
|
||
3. local_path — legacy single-path field (backward compat)
|
||
"""
|
||
if override:
|
||
return override
|
||
hostname = socket.gethostname()
|
||
host_paths = repo.get("host_paths") or {}
|
||
return host_paths.get(hostname) or repo.get("local_path") or ""
|
||
|
||
|
||
def _infer_slug_from_path(api_base: str, path: str) -> "tuple[str, str] | None":
|
||
"""Identify a registered repo from a local checkout path.
|
||
|
||
Strategy (in order):
|
||
1. Root-commit fingerprint — ``git rev-list --max-parents=0 HEAD`` produces
|
||
the same SHA-1 on every clone, independent of remote URL or checkout path.
|
||
Looked up via ``GET /repos/by-fingerprint?hash=<sha>``.
|
||
2. Remote URL fallback — exact string match against ``remote_url`` field.
|
||
Less reliable across machines (SSH aliases, HTTP vs HTTPS, etc.) but
|
||
useful when fingerprint is not yet stored.
|
||
|
||
Returns ``(slug, git_root)`` on success, ``None`` if no match found.
|
||
"""
|
||
try:
|
||
git_root = subprocess.check_output(
|
||
["git", "rev-parse", "--show-toplevel"],
|
||
cwd=path, stderr=subprocess.DEVNULL, text=True,
|
||
).strip()
|
||
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
|
||
return None
|
||
|
||
# Strategy 1: fingerprint lookup (most reliable)
|
||
try:
|
||
fingerprint = subprocess.check_output(
|
||
["git", "rev-list", "--max-parents=0", "HEAD"],
|
||
cwd=git_root, stderr=subprocess.DEVNULL, text=True,
|
||
).strip()
|
||
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
|
||
fingerprint = ""
|
||
|
||
# Get local remote URL once — used for both disambiguation and fallback
|
||
try:
|
||
remote_url = subprocess.check_output(
|
||
["git", "remote", "get-url", "origin"],
|
||
cwd=git_root, stderr=subprocess.DEVNULL, text=True,
|
||
).strip()
|
||
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
|
||
remote_url = ""
|
||
|
||
if fingerprint:
|
||
# Try fingerprint + remote URL for precise match
|
||
if remote_url:
|
||
import urllib.parse as _up
|
||
candidates = _api_get(
|
||
api_base,
|
||
f"/repos/by-fingerprint?hash={fingerprint}&remote_url={_up.quote(remote_url, safe='')}",
|
||
)
|
||
if isinstance(candidates, list) and len(candidates) == 1:
|
||
return candidates[0]["slug"], git_root
|
||
|
||
# Fingerprint alone (works when repos don't share ancestry)
|
||
candidates = _api_get(api_base, f"/repos/by-fingerprint?hash={fingerprint}")
|
||
if isinstance(candidates, list):
|
||
if len(candidates) == 1:
|
||
return candidates[0]["slug"], git_root
|
||
if len(candidates) > 1:
|
||
# Disambiguate: prefer the repo whose slug appears in the git_root path
|
||
for repo in candidates:
|
||
if repo["slug"] in git_root:
|
||
return repo["slug"], git_root
|
||
# Can't disambiguate — return first match with a warning
|
||
print(
|
||
f" WARNING: {len(candidates)} repos share fingerprint {fingerprint[:12]}… "
|
||
f"— using '{candidates[0]['slug']}'. "
|
||
"Set remote_url on each repo for accurate matching.",
|
||
file=sys.stderr,
|
||
)
|
||
return candidates[0]["slug"], git_root
|
||
|
||
# Strategy 2: remote URL exact match (fallback)
|
||
if remote_url:
|
||
import urllib.parse as _up
|
||
repo = _api_get(api_base, f"/repos/by-remote?url={_up.quote(remote_url, safe='')}")
|
||
if repo and isinstance(repo, dict) and "slug" in repo:
|
||
return repo["slug"], git_root
|
||
|
||
return None
|
||
|
||
|
||
def check_repo(api_base: str, repo_slug: str, repo_path_override: str | None = None) -> ConsistencyReport:
|
||
"""Run all consistency checks for a registered repo."""
|
||
repo = _api_get(api_base, f"/repos/{repo_slug}", return_error=True)
|
||
if isinstance(repo, dict) and "_error" in repo:
|
||
report = ConsistencyReport(repo_slug=repo_slug, repo_path=repo_path_override or "(unknown)")
|
||
report.add(
|
||
severity="FAIL", check_id="C-00",
|
||
message=(
|
||
f"Could not query State Hub API at {api_base}: {repo['_error']}"
|
||
),
|
||
fixable=False,
|
||
)
|
||
return report
|
||
if repo is None:
|
||
report = ConsistencyReport(repo_slug=repo_slug, repo_path="(unknown)")
|
||
report.add(
|
||
severity="FAIL", check_id="C-00",
|
||
message=f"Repo '{repo_slug}' not found in state-hub DB",
|
||
fixable=False,
|
||
)
|
||
return report
|
||
|
||
repo_id: str = repo["id"]
|
||
repo_path: str = resolve_repo_path(repo, repo_path_override)
|
||
report = ConsistencyReport(repo_slug=repo_slug, repo_path=repo_path)
|
||
|
||
if not repo_path:
|
||
report.add(
|
||
severity="FAIL", check_id="C-00",
|
||
message=f"Repo '{repo_slug}' has no local_path — cannot check workplan files",
|
||
fixable=False,
|
||
)
|
||
_check_orphan_db(api_base, repo_id, set(), report)
|
||
return report
|
||
|
||
repo_dir = Path(repo_path)
|
||
workplans_dir = repo_dir / "workplans"
|
||
|
||
# C-01: workplans/ directory missing
|
||
if not workplans_dir.is_dir():
|
||
report.add(
|
||
severity="FAIL", check_id="C-01",
|
||
message=(
|
||
"workplans/ directory missing — ADR-001 requires workplan files "
|
||
"at <repo>/workplans/<ID>-<slug>.md"
|
||
),
|
||
fixable=False,
|
||
)
|
||
_check_orphan_db(api_base, repo_id, set(), report)
|
||
return report
|
||
|
||
# Parse workplan files
|
||
workplan_infos: list[tuple[Path, dict, str]] = []
|
||
file_ws_ids: dict[str, tuple[Path, dict, str]] = {} # ws_id → (file, meta, body)
|
||
active_file_ws_ids: set[str] = set()
|
||
|
||
for wp_file in iter_workplan_files(workplans_dir):
|
||
try:
|
||
text = wp_file.read_text(encoding="utf-8")
|
||
except OSError as e:
|
||
report.add(severity="FAIL", check_id="C-02",
|
||
message=f"Cannot read file: {e}", file_path=workplan_display_path(repo_dir, wp_file))
|
||
continue
|
||
if not text.startswith("---"):
|
||
report.add(severity="FAIL", check_id="C-02",
|
||
message="No YAML frontmatter found", file_path=workplan_display_path(repo_dir, wp_file))
|
||
continue
|
||
meta, body = parse_frontmatter(text)
|
||
if not meta or meta.get("_parse_error"):
|
||
report.add(severity="FAIL", check_id="C-02",
|
||
message="YAML frontmatter parse error", file_path=workplan_display_path(repo_dir, wp_file))
|
||
continue
|
||
workplan_infos.append((wp_file, meta, body))
|
||
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
|
||
if ws_id:
|
||
file_ws_ids[ws_id] = (wp_file, meta, body)
|
||
if wp_file.parent == workplans_dir:
|
||
active_file_ws_ids.add(ws_id)
|
||
|
||
workplan_id_to_ws_id: dict[str, str] = {}
|
||
task_file_id_to_sh_id: dict[str, str] = {}
|
||
for _wp_file, meta, body in workplan_infos:
|
||
mapped_ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
|
||
wp_id = str(meta.get("id", "")).strip()
|
||
if wp_id and mapped_ws_id:
|
||
workplan_id_to_ws_id[wp_id] = mapped_ws_id
|
||
for task in get_tasks_from_workplan(meta, body):
|
||
if task.get("_parse_error"):
|
||
continue
|
||
task_file_id = str(task.get("id", "")).strip()
|
||
raw_sh = task.get("state_hub_task_id")
|
||
task_sh_id = "" if raw_sh is None else str(raw_sh).strip().strip('"')
|
||
if task_file_id and task_sh_id and task_sh_id not in ("~", "null", "None", "none"):
|
||
task_file_id_to_sh_id[task_file_id] = task_sh_id
|
||
|
||
# Per-workplan checks
|
||
for wp_file, meta, body in workplan_infos:
|
||
fname = workplan_display_path(repo_dir, wp_file)
|
||
archived_file = wp_file.parent.name == "archived"
|
||
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
|
||
file_status = str(meta.get("status", "")).strip()
|
||
file_title = str(meta.get("title", "")).strip()
|
||
file_domain = str(meta.get("domain", "")).strip()
|
||
|
||
normalised_file_status = normalise_workstream_status(file_status)
|
||
if archived_file and normalised_file_status not in CLOSED_WORKSTREAM_STATUSES:
|
||
report.add(
|
||
severity="FAIL", check_id="C-18",
|
||
message="Archived workplan file has an open or planning status",
|
||
file_path=fname,
|
||
file_value=file_status,
|
||
fixable=False,
|
||
)
|
||
|
||
if not ws_id:
|
||
# C-06: workplan not linked to any DB workstream
|
||
report.add(
|
||
severity="WARN", check_id="C-06",
|
||
message="Workplan has no state_hub_workstream_id — not indexed in DB",
|
||
file_path=fname,
|
||
file_value=file_title or fname,
|
||
fixable=True,
|
||
_fix_context={
|
||
"wp_file": str(wp_file),
|
||
"meta": meta,
|
||
"body": body,
|
||
"repo_id": repo_id,
|
||
"domain": file_domain,
|
||
},
|
||
)
|
||
continue
|
||
|
||
ws = _api_get(api_base, f"/workstreams/{ws_id}")
|
||
if ws is None:
|
||
# C-03: stale workstream reference
|
||
report.add(
|
||
severity="FAIL", check_id="C-03",
|
||
message=f"state_hub_workstream_id {ws_id[:8]}… not found in DB (stale reference)",
|
||
file_path=fname,
|
||
db_id=ws_id,
|
||
fixable=False,
|
||
)
|
||
continue
|
||
|
||
# C-09: repo mismatch — file is here but DB says different repo
|
||
db_repo_id: str = ws.get("repo_id") or ""
|
||
if db_repo_id != repo_id:
|
||
report.add(
|
||
severity="FAIL", check_id="C-09",
|
||
message=(
|
||
f"Workstream '{ws.get('slug')}' repo_id in DB is "
|
||
f"{db_repo_id[:8] if db_repo_id else 'None'} "
|
||
f"but backing file lives in {repo_slug} ({repo_id[:8]}…)"
|
||
),
|
||
file_path=fname,
|
||
db_id=ws_id,
|
||
file_value=repo_id,
|
||
db_value=db_repo_id,
|
||
fixable=True,
|
||
_fix_context={"ws_id": ws_id, "correct_repo_id": repo_id},
|
||
)
|
||
# Continue to check drift even with mismatched repo
|
||
|
||
tasks = get_tasks_from_workplan(meta, body)
|
||
db_tasks = _api_get(api_base, "/tasks", {"workstream_id": ws_id})
|
||
file_task_statuses = [
|
||
str(task.get("status", "")).strip()
|
||
for task in tasks
|
||
if not task.get("_parse_error")
|
||
]
|
||
db_task_statuses = [
|
||
str(task.get("status", "")).strip()
|
||
for task in db_tasks
|
||
if isinstance(db_tasks, list)
|
||
] if isinstance(db_tasks, list) else []
|
||
active_task_requires_activation = should_activate_parent_for_active_tasks(
|
||
parent_workstream_status=file_status,
|
||
task_statuses=[*file_task_statuses, *db_task_statuses],
|
||
)
|
||
if active_task_requires_activation:
|
||
report.add(
|
||
severity="WARN",
|
||
check_id="C-23",
|
||
message=(
|
||
f"Lifecycle drift in '{ws.get('slug')}': workplan status "
|
||
f"{file_status!r} has an active task — repair to 'active'"
|
||
),
|
||
file_path=fname,
|
||
db_id=ws_id,
|
||
file_value=file_status,
|
||
db_value=ws.get("status", ""),
|
||
fixable=True,
|
||
_fix_context={
|
||
"ws_id": ws_id,
|
||
"wp_file": str(wp_file),
|
||
"file_status": file_status,
|
||
"db_status": ws.get("status", ""),
|
||
"target_status": "active",
|
||
},
|
||
)
|
||
|
||
# C-04: status drift — normalise file value before comparing so that
|
||
# legacy file/API aliases are not treated as drift. Lifecycle repairs
|
||
# take precedence so a stale planning file cannot regress active work.
|
||
db_status = ws.get("status", "")
|
||
normalised_db_status = normalise_workstream_status(db_status)
|
||
if (
|
||
not active_task_requires_activation
|
||
and file_status
|
||
and db_status
|
||
and normalised_file_status != normalised_db_status
|
||
):
|
||
report.add(
|
||
severity="WARN", check_id="C-04",
|
||
message=(
|
||
f"Status drift in '{ws.get('slug')}': "
|
||
f"file={file_status!r} db={db_status!r} (file wins)"
|
||
),
|
||
file_path=fname,
|
||
db_id=ws_id,
|
||
file_value=file_status,
|
||
db_value=db_status,
|
||
fixable=True,
|
||
_fix_context={
|
||
"ws_id": ws_id,
|
||
"field": "status",
|
||
"value": normalised_file_status, # always send DB-valid value
|
||
},
|
||
)
|
||
|
||
if normalised_file_status == "ready":
|
||
review = ready_review_status(
|
||
repo_dir,
|
||
meta.get("reviewed_against_commit"),
|
||
meta.get("context_paths"),
|
||
)
|
||
if review.needs_review:
|
||
detail = f"Ready workplan may be stale: {review.reason}"
|
||
if review.changed_paths:
|
||
preview = ", ".join(review.changed_paths[:5])
|
||
extra = "" if len(review.changed_paths) <= 5 else ", ..."
|
||
detail = f"{detail}; changed paths: {preview}{extra}"
|
||
report.add(
|
||
severity="WARN",
|
||
check_id="C-21",
|
||
message=detail,
|
||
file_path=fname,
|
||
file_value=file_status,
|
||
db_value="needs_review",
|
||
fixable=False,
|
||
)
|
||
|
||
# C-05: title drift
|
||
db_title = ws.get("title", "")
|
||
if file_title and db_title and file_title != db_title:
|
||
report.add(
|
||
severity="WARN", check_id="C-05",
|
||
message=(
|
||
f"Title drift in '{ws.get('slug')}': "
|
||
f"file={file_title!r} db={db_title!r} (file wins)"
|
||
),
|
||
file_path=fname,
|
||
db_id=ws_id,
|
||
file_value=file_title,
|
||
db_value=db_title,
|
||
fixable=True,
|
||
_fix_context={"ws_id": ws_id, "field": "title", "value": file_title},
|
||
)
|
||
|
||
planning_priority = str(meta.get("planning_priority", "")).strip() or None
|
||
if planning_priority != (ws.get("planning_priority") or None):
|
||
report.add(
|
||
severity="WARN", check_id="C-19",
|
||
message=(
|
||
f"Planning priority drift in '{ws.get('slug')}': "
|
||
f"file={planning_priority!r} db={ws.get('planning_priority')!r} (file wins)"
|
||
),
|
||
file_path=fname,
|
||
db_id=ws_id,
|
||
file_value=planning_priority,
|
||
db_value=ws.get("planning_priority"),
|
||
fixable=True,
|
||
_fix_context={"ws_id": ws_id, "field": "planning_priority", "value": planning_priority},
|
||
)
|
||
|
||
planning_order = _as_int_or_none(meta.get("planning_order"))
|
||
if planning_order != ws.get("planning_order"):
|
||
report.add(
|
||
severity="WARN", check_id="C-19",
|
||
message=(
|
||
f"Planning order drift in '{ws.get('slug')}': "
|
||
f"file={planning_order!r} db={ws.get('planning_order')!r} (file wins)"
|
||
),
|
||
file_path=fname,
|
||
db_id=ws_id,
|
||
file_value=planning_order,
|
||
db_value=ws.get("planning_order"),
|
||
fixable=True,
|
||
_fix_context={"ws_id": ws_id, "field": "planning_order", "value": planning_order},
|
||
)
|
||
|
||
# C-10, C-11, C-12: task-level checks
|
||
db_task_by_id: dict[str, dict] = {}
|
||
if isinstance(db_tasks, list):
|
||
for t in db_tasks:
|
||
db_task_by_id[t["id"]] = t
|
||
|
||
existing_deps = _api_get(api_base, f"/workstreams/{ws_id}/dependencies") or []
|
||
existing_dep_keys = set()
|
||
if isinstance(existing_deps, list):
|
||
for dep in existing_deps:
|
||
if dep.get("from_workstream_id") != ws_id:
|
||
continue
|
||
rel = dep.get("relationship_type") or "blocks"
|
||
if dep.get("to_workstream_id"):
|
||
existing_dep_keys.add(("workstream", dep["to_workstream_id"], rel))
|
||
if dep.get("to_task_id"):
|
||
existing_dep_keys.add(("task", dep["to_task_id"], rel))
|
||
|
||
for target_wp_id in _as_list(meta.get("depends_on_workplans")):
|
||
target_ws_id = workplan_id_to_ws_id.get(target_wp_id)
|
||
if not target_ws_id:
|
||
report.add(
|
||
severity="WARN",
|
||
check_id="C-20",
|
||
message=f"Workplan dependency target '{target_wp_id}' is not linked to State Hub",
|
||
file_path=fname,
|
||
file_value=target_wp_id,
|
||
fixable=False,
|
||
)
|
||
continue
|
||
dep_key = ("workstream", target_ws_id, "blocks")
|
||
if dep_key not in existing_dep_keys:
|
||
report.add(
|
||
severity="WARN",
|
||
check_id="C-20",
|
||
message=f"Missing DB dependency edge: {ws_id[:8]}… depends on workplan {target_wp_id}",
|
||
file_path=fname,
|
||
db_id=ws_id,
|
||
file_value=target_wp_id,
|
||
fixable=True,
|
||
_fix_context={
|
||
"from_workstream_id": ws_id,
|
||
"to_workstream_id": target_ws_id,
|
||
"relationship_type": "blocks",
|
||
},
|
||
)
|
||
|
||
for target_task_id in _as_list(meta.get("depends_on_tasks")):
|
||
target_sh_id = task_file_id_to_sh_id.get(target_task_id)
|
||
if not target_sh_id:
|
||
report.add(
|
||
severity="WARN",
|
||
check_id="C-20",
|
||
message=f"Task dependency target '{target_task_id}' is not linked to State Hub",
|
||
file_path=fname,
|
||
file_value=target_task_id,
|
||
fixable=False,
|
||
)
|
||
continue
|
||
dep_key = ("task", target_sh_id, "starts_after")
|
||
if dep_key not in existing_dep_keys:
|
||
report.add(
|
||
severity="WARN",
|
||
check_id="C-20",
|
||
message=f"Missing DB dependency edge: {ws_id[:8]}… starts after task {target_task_id}",
|
||
file_path=fname,
|
||
db_id=ws_id,
|
||
file_value=target_task_id,
|
||
fixable=True,
|
||
_fix_context={
|
||
"from_workstream_id": ws_id,
|
||
"to_task_id": target_sh_id,
|
||
"relationship_type": "starts_after",
|
||
},
|
||
)
|
||
|
||
file_task_sh_ids: set[str] = set()
|
||
|
||
for task in tasks:
|
||
if task.get("_parse_error"):
|
||
continue
|
||
t_id = str(task.get("id", "")).strip()
|
||
_raw_sh = task.get("state_hub_task_id")
|
||
t_sh_id = "" if _raw_sh is None else str(_raw_sh).strip().strip('"')
|
||
if t_sh_id in ("~", "null", "None", "none"):
|
||
t_sh_id = ""
|
||
t_status = normalise_task_status(task.get("status", "todo"))
|
||
|
||
if t_sh_id:
|
||
file_task_sh_ids.add(t_sh_id)
|
||
db_task = _api_get(api_base, f"/tasks/{t_sh_id}")
|
||
if db_task is None:
|
||
report.add(
|
||
severity="FAIL", check_id="C-03",
|
||
message=f"state_hub_task_id {t_sh_id[:8]}… not found in DB",
|
||
file_path=f"{fname}#{t_id}",
|
||
db_id=t_sh_id,
|
||
fixable=False,
|
||
)
|
||
continue
|
||
# C-10 / C-15: task status drift
|
||
db_t_status = normalise_task_status(db_task.get("status", "todo"))
|
||
if t_status and db_t_status and t_status != db_t_status:
|
||
db_rank = STATUS_ORDER.get(db_t_status, 0)
|
||
file_rank = STATUS_ORDER.get(t_status, 0)
|
||
if db_rank >= file_rank:
|
||
# C-15: DB is already at the same rank or ahead — prevent
|
||
# regression. Writeback syncs the file to match DB.
|
||
report.add(
|
||
severity="WARN", check_id="C-15",
|
||
message=(
|
||
f"DB task '{t_id}' is ahead of file "
|
||
f"(db={db_t_status!r}, file={t_status!r}) "
|
||
f"— regression prevented; writeback will sync file"
|
||
),
|
||
file_path=f"{fname}#{t_id}",
|
||
db_id=t_sh_id,
|
||
file_value=t_status,
|
||
db_value=db_t_status,
|
||
fixable=True,
|
||
_fix_context={
|
||
"task_id": t_sh_id,
|
||
"wp_file": str(wp_file),
|
||
"task_block_id": t_id,
|
||
"db_status": db_t_status,
|
||
},
|
||
)
|
||
else:
|
||
# C-10: file is ahead — apply file→DB sync (normal drift)
|
||
report.add(
|
||
severity="WARN", check_id="C-10",
|
||
message=(
|
||
f"Task status drift '{t_id}': "
|
||
f"file={t_status!r} db={db_t_status!r} (file wins)"
|
||
),
|
||
file_path=f"{fname}#{t_id}",
|
||
db_id=t_sh_id,
|
||
file_value=t_status,
|
||
db_value=db_t_status,
|
||
fixable=True,
|
||
_fix_context={"task_id": t_sh_id, "status": t_status},
|
||
)
|
||
file_description = task.get("description")
|
||
if isinstance(file_description, str):
|
||
file_description = file_description.strip() or None
|
||
else:
|
||
file_description = None
|
||
db_description = db_task.get("description")
|
||
if isinstance(db_description, str):
|
||
db_description = db_description.strip() or None
|
||
else:
|
||
db_description = None
|
||
if file_description and file_description != db_description:
|
||
report.add(
|
||
severity="WARN", check_id="C-22",
|
||
message=f"Task description drift '{t_id}': file content differs from DB (file wins)",
|
||
file_path=f"{fname}#{t_id}",
|
||
db_id=t_sh_id,
|
||
file_value=file_description[:120],
|
||
db_value=(db_description or "")[:120],
|
||
fixable=True,
|
||
_fix_context={"task_id": t_sh_id, "description": file_description},
|
||
)
|
||
elif t_id:
|
||
# C-11: task exists in file but not linked to DB
|
||
ws_status = ws.get("status", "")
|
||
report.add(
|
||
severity="WARN", check_id="C-11",
|
||
message=f"Task '{t_id}' has no state_hub_task_id",
|
||
file_path=f"{fname}#{t_id}",
|
||
fixable=True,
|
||
_fix_context={
|
||
"ws_id": ws_id,
|
||
"ws_status": ws_status,
|
||
"task": task,
|
||
"wp_file": str(wp_file),
|
||
"meta": meta,
|
||
"body": body,
|
||
},
|
||
)
|
||
|
||
# C-12: DB tasks with no file backing
|
||
if isinstance(db_tasks, list):
|
||
ws_status = ws.get("status", "")
|
||
ws_finished = normalise_workstream_status(ws_status) in CLOSED_WORKSTREAM_STATUSES
|
||
for db_t in db_tasks:
|
||
if db_t["id"] not in file_task_sh_ids:
|
||
db_t_status = db_t.get("status", "")
|
||
open_task = db_t_status not in TERMINAL_TASK_STATUSES
|
||
# Auto-cancel fixable when workstream is finished and task is open
|
||
fixable = ws_finished and open_task
|
||
report.add(
|
||
severity="WARN", check_id="C-12",
|
||
message=(
|
||
f"DB task '{db_t.get('title', '')}' "
|
||
f"(id={db_t['id'][:8]}…, status={db_t_status}) "
|
||
f"in workstream '{ws.get('slug')}' has no file backing"
|
||
),
|
||
db_id=db_t["id"],
|
||
fixable=fixable,
|
||
_fix_context={
|
||
"task_id": db_t["id"],
|
||
"ws_finished": ws_finished,
|
||
},
|
||
)
|
||
|
||
# C-13: all DB tasks done but workstream still active — worker forgot to close
|
||
db_status = ws.get("status", "")
|
||
if normalise_workstream_status(db_status) == "active" and isinstance(db_tasks, list) and db_tasks:
|
||
non_terminal = [
|
||
t for t in db_tasks
|
||
if normalise_task_status(t.get("status", "todo")) not in TERMINAL_TASK_STATUSES
|
||
]
|
||
if not non_terminal:
|
||
report.add(
|
||
severity="WARN", check_id="C-13",
|
||
message=(
|
||
f"All {len(db_tasks)} DB tasks for '{ws.get('slug')}' are "
|
||
f"done/cancel but workstream status is still 'active' — "
|
||
f"worker likely forgot update_workstream_status()"
|
||
),
|
||
file_path=fname,
|
||
db_id=ws_id,
|
||
db_value="active",
|
||
fixable=True,
|
||
_fix_context={
|
||
"ws_id": ws_id,
|
||
"field": "status",
|
||
"value": "finished",
|
||
},
|
||
)
|
||
|
||
# C-07 / C-08: orphan DB workstreams (have repo_id=this_repo but no backing file)
|
||
_check_orphan_db(api_base, repo_id, set(file_ws_ids.keys()), report, active_file_ws_ids)
|
||
|
||
# C-14: ghost duplicate — active workstream on same topic with no repo_id whose
|
||
# title matches a file-backed workstream. Root cause: create_workstream() called
|
||
# before the workplan file was written; fix-consistency then created a second
|
||
# workstream from the file, leaving the first as an invisible orphan.
|
||
_check_ghost_duplicates(api_base, workplan_infos, file_ws_ids, report)
|
||
|
||
return report
|
||
|
||
|
||
def _check_orphan_db(
|
||
api_base: str,
|
||
repo_id: str,
|
||
file_ws_ids: set[str],
|
||
report: ConsistencyReport,
|
||
active_file_ws_ids: set[str] | None = None,
|
||
) -> None:
|
||
"""Flag DB workstreams with repo_id=this_repo that have no backing workplan file."""
|
||
active_file_ws_ids = active_file_ws_ids or file_ws_ids
|
||
all_ws = _api_get(api_base, "/workstreams", {"repo_id": repo_id})
|
||
if not isinstance(all_ws, list):
|
||
return
|
||
for ws in all_ws:
|
||
ws_id = ws["id"]
|
||
ws_status = ws.get("status", "")
|
||
normalised_status = normalise_workstream_status(ws_status)
|
||
if normalised_status not in CLOSED_WORKSTREAM_STATUSES and ws_id in active_file_ws_ids:
|
||
continue
|
||
if normalised_status in CLOSED_WORKSTREAM_STATUSES and ws_id in file_ws_ids:
|
||
continue
|
||
ws_slug = ws.get("slug", "")
|
||
if normalised_status not in CLOSED_WORKSTREAM_STATUSES:
|
||
report.add(
|
||
severity="FAIL", check_id="C-07",
|
||
message=(
|
||
f"Non-closed DB workstream '{ws_slug}' (id={ws_id[:8]}…) "
|
||
f"has no backing workplan file — ADR-001 violation"
|
||
),
|
||
db_id=ws_id,
|
||
fixable=False,
|
||
)
|
||
elif normalised_status in CLOSED_WORKSTREAM_STATUSES:
|
||
report.add(
|
||
severity="INFO", check_id="C-08",
|
||
message=(
|
||
f"Closed DB workstream '{ws_slug}' "
|
||
f"(id={ws_id[:8]}…, status={ws_status}) has no backing workplan file"
|
||
),
|
||
db_id=ws_id,
|
||
fixable=False,
|
||
)
|
||
|
||
|
||
def _check_ghost_duplicates(
|
||
api_base: str,
|
||
workplan_infos: list[tuple],
|
||
file_ws_ids: dict[str, tuple],
|
||
report: ConsistencyReport,
|
||
) -> None:
|
||
"""C-14: detect active workstreams with repo_id=null whose title matches a
|
||
file-backed workstream — these are ghosts created by premature create_workstream()
|
||
calls before the workplan file existed.
|
||
"""
|
||
# Build lookup: normalised title → file-backed workstream id
|
||
file_titles: dict[str, str] = {}
|
||
for _, meta, _ in workplan_infos:
|
||
ws_id = str(meta.get("state_hub_workstream_id", "")).strip().strip('"')
|
||
title = str(meta.get("title", "")).strip().lower()
|
||
if title and ws_id:
|
||
file_titles[title] = ws_id
|
||
|
||
if not file_titles:
|
||
return
|
||
|
||
# Gather topic_ids from all file-backed workstreams so we can query by topic
|
||
topic_ids: set[str] = set()
|
||
for ws_id in file_ws_ids:
|
||
ws = _api_get(api_base, f"/workstreams/{ws_id}")
|
||
if ws and ws.get("topic_id"):
|
||
topic_ids.add(ws["topic_id"])
|
||
|
||
for topic_id in topic_ids:
|
||
topic_ws: list[dict] = []
|
||
for status in OPEN_WORKSTREAM_STATUSES:
|
||
status_rows = _api_get(api_base, "/workstreams", {"topic_id": topic_id, "status": status})
|
||
if isinstance(status_rows, list):
|
||
topic_ws.extend(status_rows)
|
||
for ws in topic_ws:
|
||
ws_id = ws["id"]
|
||
if ws_id in file_ws_ids:
|
||
continue # legitimately linked
|
||
if ws.get("repo_id"):
|
||
continue # C-07 covers repo-scoped orphans
|
||
ws_title = ws.get("title", "").strip().lower()
|
||
if ws_title in file_titles:
|
||
file_backed_id = file_titles[ws_title]
|
||
report.add(
|
||
severity="WARN",
|
||
check_id="C-14",
|
||
message=(
|
||
f"Ghost duplicate: active workstream '{ws.get('slug')}' "
|
||
f"(id={ws_id[:8]}…, repo_id=null) has same title as "
|
||
f"file-backed workstream {file_backed_id[:8]}… — "
|
||
f"likely created via create_workstream() before workplan file existed; "
|
||
f"archive it"
|
||
),
|
||
db_id=ws_id,
|
||
fixable=False,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Git sync (T02–T04: pull gate, writeback, push seal) — see repo_sync.py
|
||
# ---------------------------------------------------------------------------
|
||
# repo_sync.py owns the push-seal invariant and all git lifecycle primitives.
|
||
# The aliases below keep internal call sites and existing tests unchanged.
|
||
|
||
from repo_sync import ( # noqa: E402 (import after top-level imports intentional)
|
||
count_local_ahead as _detect_ahead_of_remote,
|
||
count_remote_ahead as _count_remote_ahead,
|
||
pull_ff as _git_pull,
|
||
push_ff as _git_push,
|
||
)
|
||
|
||
|
||
def _detect_behind_remote(repo_path: str) -> bool:
|
||
"""True if remote has commits the local repo lacks (C-16 predicate).
|
||
|
||
Delegates to repo_sync.count_remote_ahead, which fetches before counting.
|
||
Returns False on any error so C-16 is never spuriously triggered.
|
||
"""
|
||
return _count_remote_ahead(repo_path) > 0
|
||
|
||
|
||
def _patch_task_status_in_file(
|
||
file_path: Path, task_block_id: str, new_status: str
|
||
) -> bool:
|
||
"""Update the ``status:`` field inside a task block identified by its id.
|
||
|
||
Only modifies lines inside a ```task … ``` fenced block whose ``id:``
|
||
matches *task_block_id*. Returns True if the file was changed.
|
||
"""
|
||
text = file_path.read_text(encoding="utf-8")
|
||
|
||
def _replace(m: re.Match) -> str:
|
||
# m.group(0) is the full ```task...``` block including fences.
|
||
# m.group(1) is the inner YAML content (no fences).
|
||
full_block = m.group(0)
|
||
inner = m.group(1).strip()
|
||
task_meta = _parse_yaml_block(inner)
|
||
if str(task_meta.get("id", "")).strip() != task_block_id:
|
||
return full_block
|
||
# Replace the status line only, leave everything else untouched.
|
||
return re.sub(
|
||
r"^(status:\s*)\S+",
|
||
rf"\g<1>{new_status}",
|
||
full_block,
|
||
flags=re.MULTILINE,
|
||
)
|
||
|
||
new_text = _TASK_BLOCK_RE.sub(_replace, text)
|
||
if new_text != text:
|
||
file_path.write_text(new_text, encoding="utf-8")
|
||
return True
|
||
return False
|
||
|
||
|
||
def _git_commit_writeback(
|
||
repo_path: str,
|
||
file_path: Path,
|
||
changes: list[str],
|
||
*,
|
||
subject: str = "chore(consistency): sync task status from DB [auto]",
|
||
) -> bool:
|
||
"""Stage *file_path* and commit with a standard writeback message.
|
||
|
||
Returns True on success, False on any error (errors are logged to stderr
|
||
but do not abort the consistency run).
|
||
"""
|
||
from datetime import date as _date
|
||
summary = "\n".join(f" - {c}" for c in changes)
|
||
msg = (
|
||
f"{subject}\n\n"
|
||
f"Updated by fix-consistency on {_date.today().isoformat()}:\n"
|
||
f"{summary}"
|
||
)
|
||
import os as _os
|
||
# Pass GIT_CUSTODIAN_SYNC=1 so the post-commit hook can detect it is
|
||
# running from within a sync pass and exit early, preventing re-entrancy.
|
||
sync_env = {**_os.environ, "GIT_CUSTODIAN_SYNC": "1"}
|
||
try:
|
||
subprocess.run(
|
||
["git", "-C", repo_path, "add", str(file_path)],
|
||
check=True, capture_output=True, env=sync_env,
|
||
)
|
||
subprocess.run(
|
||
["git", "-C", repo_path, "commit", "-m", msg],
|
||
check=True, capture_output=True, env=sync_env,
|
||
)
|
||
return True
|
||
except subprocess.CalledProcessError as e:
|
||
print(
|
||
f"WARN: git commit failed for writeback: {e.stderr.decode().strip()}",
|
||
file=sys.stderr,
|
||
)
|
||
return False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Worker orientation brief (.custodian-brief.md)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_BRIEF_HEADER = "<!-- custodian-brief: generated by fix-consistency — do not edit manually -->"
|
||
_TASK_STATUS_ICON = {"done": "✓", "cancel": "✗", "progress": "►", "wait": "!", "todo": "·"}
|
||
_OPEN_STATUSES = set(OPEN_TASK_STATUSES)
|
||
|
||
|
||
def _write_custodian_brief(api_base: str, repo_slug: str, repo_path: str) -> bool:
|
||
"""Generate .custodian-brief.md at the repo root and git-commit if changed.
|
||
|
||
The brief gives any agent — including subagents without MCP access and
|
||
workers on remote machines — instant orientation without a live hub
|
||
connection. Returns True if the file was written (content changed).
|
||
"""
|
||
import datetime as _dt
|
||
from datetime import timezone as _tz
|
||
|
||
repo = _api_get(api_base, f"/repos/{repo_slug}")
|
||
if not repo:
|
||
return False
|
||
|
||
repo_id: str = repo.get("id", "")
|
||
domain_slug: str = ""
|
||
|
||
# Resolve domain slug: prefer active workstreams, fall back to any workstream
|
||
# so that a fully-finished repo doesn't degrade to "(unknown)".
|
||
workstreams: list[dict] = []
|
||
for status in OPEN_WORKSTREAM_STATUSES:
|
||
rows = _api_get(api_base, "/workstreams", {"repo_id": repo_id, "status": status}) or []
|
||
if isinstance(rows, list):
|
||
workstreams.extend(rows)
|
||
_ws_for_domain = workstreams if workstreams else []
|
||
if not _ws_for_domain:
|
||
all_ws = _api_get(api_base, "/workstreams", {"repo_id": repo_id}) or []
|
||
_ws_for_domain = all_ws if isinstance(all_ws, list) else []
|
||
if _ws_for_domain:
|
||
topic = _api_get(api_base, f"/topics/{_ws_for_domain[0].get('topic_id', '')}")
|
||
if topic:
|
||
domain_slug = topic.get("domain_slug", "")
|
||
|
||
# Active repo goal (first active one if multiple)
|
||
goal_text = ""
|
||
goals = _api_get(api_base, "/repo-goals", {"repo_slug": repo_slug}) or []
|
||
if isinstance(goals, list):
|
||
active_goals = [g for g in goals if g.get("status") == "active"]
|
||
if active_goals:
|
||
g = active_goals[0]
|
||
goal_text = g.get("title", "") or g.get("description", "")
|
||
|
||
now_utc = _dt.datetime.now(_tz.utc)
|
||
ts = now_utc.strftime("%Y-%m-%d %H:%M UTC")
|
||
|
||
lines = [
|
||
_BRIEF_HEADER,
|
||
f"# Custodian Brief — {repo_slug}",
|
||
"",
|
||
f"**Domain:** {domain_slug or '(unknown)'} ",
|
||
f"**Last synced:** {ts} ",
|
||
"**State Hub:** http://127.0.0.1:8000 *(adjust if running on a remote machine)*",
|
||
"",
|
||
]
|
||
|
||
if goal_text:
|
||
lines += ["## Current Goal", "", goal_text, ""]
|
||
|
||
if isinstance(workstreams, list) and workstreams:
|
||
lines.append("## Active Workstreams")
|
||
for ws in workstreams:
|
||
ws_title = ws.get("title", ws.get("slug", "?"))
|
||
ws_id = ws["id"]
|
||
tasks = _api_get(api_base, "/tasks", {"workstream_id": ws_id}) or []
|
||
if not isinstance(tasks, list):
|
||
tasks = []
|
||
|
||
done = sum(
|
||
1 for t in tasks
|
||
if normalise_task_status(t.get("status", "todo")) in TERMINAL_TASK_STATUSES
|
||
)
|
||
total = len(tasks)
|
||
pct = f"{done}/{total}" if total else "no tasks"
|
||
|
||
open_tasks = [
|
||
t for t in tasks
|
||
if normalise_task_status(t.get("status", "todo")) in _OPEN_STATUSES
|
||
]
|
||
# Show wait first, then progress, then todo (cap at 5).
|
||
priority_order = {"wait": 0, "progress": 1, "todo": 2}
|
||
open_tasks.sort(
|
||
key=lambda t: priority_order.get(
|
||
normalise_task_status(t.get("status", "todo")), 9
|
||
)
|
||
)
|
||
|
||
lines += [
|
||
"",
|
||
f"### {ws_title}",
|
||
f"Progress: {pct} done | workstream_id: `{ws_id}`",
|
||
]
|
||
|
||
if open_tasks:
|
||
lines.append("")
|
||
lines.append("**Open tasks:**")
|
||
for t in open_tasks[:7]:
|
||
status = normalise_task_status(t.get("status", "todo"))
|
||
icon = _TASK_STATUS_ICON.get(status, "·")
|
||
title = t.get("title", t["id"])
|
||
tid = t["id"]
|
||
blocker = t.get("blocking_reason", "")
|
||
task_line = f"- {icon} {title} `{tid[:8]}`"
|
||
if status == "wait" and blocker:
|
||
task_line += f"\n *(wait: {blocker})*"
|
||
lines.append(task_line)
|
||
if len(open_tasks) > 7:
|
||
lines.append(f"- … and {len(open_tasks) - 7} more open tasks")
|
||
else:
|
||
lines += ["## Active Workstreams", "", "*(none — repo may need first-session setup)*"]
|
||
|
||
lines += [
|
||
"",
|
||
"---",
|
||
"## MCP Orientation (when available)",
|
||
"",
|
||
"If the state-hub MCP server is reachable, call:",
|
||
f"`get_domain_summary(\"{domain_slug}\")`",
|
||
"This provides richer cross-domain context.",
|
||
"If the MCP call fails, use this file as your orientation source.",
|
||
]
|
||
|
||
content = "\n".join(lines) + "\n"
|
||
|
||
brief_path = Path(repo_path) / ".custodian-brief.md"
|
||
existing = brief_path.read_text(encoding="utf-8") if brief_path.exists() else ""
|
||
|
||
# Strip the timestamp line before comparing to avoid spurious writes
|
||
def _strip_ts(text: str) -> str:
|
||
return "\n".join(
|
||
ln for ln in text.splitlines()
|
||
if not ln.startswith("**Last synced:**")
|
||
)
|
||
|
||
if _strip_ts(content) == _strip_ts(existing):
|
||
return False # no meaningful change
|
||
|
||
brief_path.write_text(content, encoding="utf-8")
|
||
|
||
# Commit the brief so remote workers can pull it
|
||
_git_commit_writeback(
|
||
repo_path,
|
||
brief_path,
|
||
[f"update .custodian-brief.md for {repo_slug}"],
|
||
)
|
||
return True
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fix engine
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def fix_repo(
|
||
api_base: str,
|
||
repo_slug: str,
|
||
repo_path_override: str | None = None,
|
||
no_writeback: bool = False,
|
||
) -> ConsistencyReport:
|
||
"""Run checks then apply all auto-fixable issues. Returns updated report."""
|
||
report = check_repo(api_base, repo_slug, repo_path_override)
|
||
if any(i.check_id == "C-00" for i in report.failures):
|
||
return report
|
||
|
||
# Auto-register this machine's path in host_paths so future runs work
|
||
# without --repo-path. Idempotent: skipped when already correct.
|
||
repo_path = report.repo_path
|
||
if repo_path:
|
||
repo_record = _api_get(api_base, f"/repos/{repo_slug}")
|
||
if repo_record:
|
||
hostname = socket.gethostname()
|
||
if (repo_record.get("host_paths") or {}).get(hostname) != repo_path:
|
||
result = _api_post(
|
||
api_base, f"/repos/{repo_slug}/paths",
|
||
{"host": hostname, "path": repo_path},
|
||
)
|
||
if result and "_error" not in result:
|
||
report.fixes_applied.append(
|
||
f"host_paths[{hostname}] → {repo_path}"
|
||
)
|
||
|
||
# T02 — pull gate: warn and skip all write operations when local repo is
|
||
# behind its remote tracking branch.
|
||
if repo_path and _detect_behind_remote(repo_path):
|
||
report.add(
|
||
severity="WARN", check_id="C-16",
|
||
message=(
|
||
f"Repo '{repo_slug}' is behind its remote tracking branch — "
|
||
f"pull before fixing to avoid clobbering remote progress. "
|
||
f"Run: git -C {repo_path} pull --ff-only"
|
||
),
|
||
fixable=False,
|
||
)
|
||
report.fixes_applied.append(
|
||
"C-16: all write operations skipped — local repo is behind remote"
|
||
)
|
||
return report
|
||
|
||
# C-17: backlog guard — if local has unpushed commits from a prior failed push,
|
||
# try to push them before making more. Skipping writes prevents runaway divergence.
|
||
if repo_path:
|
||
ahead = _detect_ahead_of_remote(repo_path)
|
||
if ahead > 0:
|
||
push_ok, push_msg = _git_push(repo_path)
|
||
if not push_ok:
|
||
report.add(
|
||
severity="WARN", check_id="C-17",
|
||
message=(
|
||
f"Repo '{repo_slug}' has {ahead} unpushed commit(s) and push "
|
||
f"failed ({push_msg}) — skipping writes to prevent runaway divergence"
|
||
),
|
||
fixable=False,
|
||
)
|
||
report.fixes_applied.append(
|
||
"C-17: all write operations skipped — unpushed commits, push failed"
|
||
)
|
||
return report
|
||
# Push succeeded — local is now in sync; proceed normally
|
||
report.fixes_applied.append(f"C-17 cleared: pushed {ahead} backlogged commit(s)")
|
||
|
||
fixable = [i for i in report.issues if i.fixable]
|
||
|
||
for issue in fixable:
|
||
ctx = issue._fix_context
|
||
try:
|
||
if issue.check_id in ("C-04", "C-05", "C-13", "C-19"):
|
||
ws_id = ctx["ws_id"]
|
||
result = _api_patch(api_base, f"/workstreams/{ws_id}",
|
||
{ctx["field"]: ctx["value"]})
|
||
if result is not None and "_error" not in result:
|
||
report.fixes_applied.append(
|
||
f"{issue.check_id} fixed: workstream {ws_id[:8]}… "
|
||
f"{ctx['field']} → {ctx['value']!r}"
|
||
)
|
||
elif result is not None:
|
||
report.fixes_applied.append(
|
||
f"{issue.check_id} FAILED: workstream {ws_id[:8]}… "
|
||
f"{ctx['field']} → {ctx['value']!r}: {result['_error']}"
|
||
)
|
||
|
||
elif issue.check_id == "C-23":
|
||
ws_id = ctx["ws_id"]
|
||
target_status = ctx["target_status"]
|
||
result = _api_patch(api_base, f"/workstreams/{ws_id}", {"status": target_status})
|
||
if result is not None and "_error" not in result:
|
||
report.fixes_applied.append(
|
||
f"C-23 fixed: workstream {ws_id[:8]}… status → {target_status!r}"
|
||
)
|
||
elif result is not None:
|
||
report.fixes_applied.append(
|
||
f"C-23 FAILED: workstream {ws_id[:8]}… "
|
||
f"status → {target_status!r}: {result['_error']}"
|
||
)
|
||
|
||
if no_writeback:
|
||
report.fixes_applied.append(
|
||
f"C-23 skipped file repair (--no-writeback): {Path(ctx['wp_file']).name}"
|
||
)
|
||
else:
|
||
wp_file = Path(ctx["wp_file"])
|
||
old_status = ctx["file_status"]
|
||
if _patch_frontmatter_field(wp_file, "status", target_status):
|
||
committed = _git_commit_writeback(
|
||
repo_path,
|
||
wp_file,
|
||
[f"workplan status: {old_status} → {target_status}"],
|
||
subject="chore(consistency): renormalize lifecycle state [auto]",
|
||
)
|
||
suffix = " (committed)" if committed else " (file patched, commit failed)"
|
||
report.fixes_applied.append(
|
||
f"C-23 fixed: {wp_file.name} status "
|
||
f"{old_status} → {target_status}{suffix}"
|
||
)
|
||
else:
|
||
report.fixes_applied.append(
|
||
f"C-23 SKIP: {wp_file.name} already has status {target_status!r}"
|
||
)
|
||
|
||
elif issue.check_id == "C-06":
|
||
wp_file = Path(ctx["wp_file"])
|
||
meta = ctx["meta"]
|
||
domain = ctx["domain"]
|
||
repo_id_val = ctx["repo_id"]
|
||
body = ctx.get("body", "")
|
||
wp_id = str(meta.get("id", "")).strip()
|
||
title = str(meta.get("title", "")).strip()
|
||
status = str(meta.get("status", "active")).strip()
|
||
status = normalise_workstream_status(status)
|
||
if status not in VALID_WP_STATUSES:
|
||
status = "active"
|
||
|
||
# Find topic_id for this domain
|
||
topics = _api_get(api_base, "/topics")
|
||
topic_id = None
|
||
if isinstance(topics, list):
|
||
for t in topics:
|
||
if t.get("domain_slug") == domain:
|
||
topic_id = t["id"]
|
||
break
|
||
if topic_id is None:
|
||
report.fixes_applied.append(
|
||
f"C-06 SKIP {wp_id}: no topic found for domain '{domain}'"
|
||
)
|
||
continue
|
||
|
||
slug = re.sub(r"[^a-z0-9-]", "-", wp_id.lower()).strip("-")
|
||
ws_data = _api_post(api_base, "/workstreams", {
|
||
"topic_id": topic_id,
|
||
"repo_id": repo_id_val,
|
||
"slug": slug,
|
||
"title": title or wp_id,
|
||
"status": status,
|
||
"owner": str(meta.get("owner", "")).strip() or None,
|
||
"planning_priority": str(meta.get("planning_priority", "")).strip() or None,
|
||
"planning_order": _as_int_or_none(meta.get("planning_order")),
|
||
})
|
||
if ws_data is None:
|
||
report.fixes_applied.append(
|
||
f"C-06 FAIL {wp_id}: could not create workstream in DB"
|
||
)
|
||
continue
|
||
|
||
new_ws_id = ws_data["id"]
|
||
_add_frontmatter_field(wp_file, "state_hub_workstream_id", new_ws_id)
|
||
report.fixes_applied.append(
|
||
f"C-06 fixed: created workstream {new_ws_id[:8]}… "
|
||
f"for {wp_id}, wrote ID to {wp_file.name}"
|
||
)
|
||
|
||
# Create tasks and inject IDs
|
||
tasks = get_tasks_from_workplan(meta, body)
|
||
for task in tasks:
|
||
if task.get("_parse_error"):
|
||
continue
|
||
t_id = str(task.get("id", "")).strip()
|
||
if not t_id:
|
||
continue
|
||
t_status = normalise_task_status(task.get("status", "todo"))
|
||
t_priority = str(task.get("priority", "medium")).strip()
|
||
if t_priority not in VALID_TASK_PRIORITIES:
|
||
t_priority = "medium"
|
||
t_data = _api_post(api_base, "/tasks", {
|
||
"workstream_id": new_ws_id,
|
||
"title": str(task.get("title", t_id)).strip() or t_id,
|
||
"description": task.get("description") or None,
|
||
"status": t_status,
|
||
"priority": t_priority,
|
||
"assignee": task.get("assignee") or None,
|
||
})
|
||
if t_data:
|
||
t_db_id = t_data["id"]
|
||
injected = _inject_task_id_into_block(
|
||
wp_file, "state_hub_task_id", t_db_id, t_id
|
||
)
|
||
if not injected:
|
||
_inject_task_id_frontmatter_list(wp_file, t_db_id, t_id)
|
||
report.fixes_applied.append(f" + task {t_id} → {t_db_id[:8]}…")
|
||
|
||
elif issue.check_id == "C-09":
|
||
ws_id = ctx["ws_id"]
|
||
correct_repo_id = ctx["correct_repo_id"]
|
||
result = _api_patch(api_base, f"/workstreams/{ws_id}",
|
||
{"repo_id": correct_repo_id})
|
||
if result is not None:
|
||
report.fixes_applied.append(
|
||
f"C-09 fixed: workstream {ws_id[:8]}… "
|
||
f"repo_id → {correct_repo_id[:8]}…"
|
||
)
|
||
|
||
elif issue.check_id == "C-20":
|
||
from_workstream_id = ctx["from_workstream_id"]
|
||
body = {
|
||
"to_workstream_id": ctx.get("to_workstream_id"),
|
||
"to_task_id": ctx.get("to_task_id"),
|
||
"relationship_type": ctx["relationship_type"],
|
||
}
|
||
result = _api_post(api_base, f"/workstreams/{from_workstream_id}/dependencies", body)
|
||
if result is not None and "_error" not in result:
|
||
target = ctx.get("to_workstream_id") or ctx.get("to_task_id")
|
||
report.fixes_applied.append(
|
||
f"C-20 fixed: dependency {from_workstream_id[:8]}… "
|
||
f"{ctx['relationship_type']} → {target[:8]}…"
|
||
)
|
||
elif result is not None:
|
||
report.fixes_applied.append(
|
||
f"C-20 FAILED: {result['_error']}"
|
||
)
|
||
|
||
elif issue.check_id == "C-10":
|
||
task_id = ctx["task_id"]
|
||
status = ctx["status"]
|
||
result = _api_patch(api_base, f"/tasks/{task_id}",
|
||
{"status": status, "suppress_token_event": True})
|
||
if result is not None and "_error" not in result:
|
||
report.fixes_applied.append(
|
||
f"C-10 fixed: task {task_id[:8]}… status → {status!r}"
|
||
)
|
||
elif result is not None:
|
||
report.fixes_applied.append(
|
||
f"C-10 FAILED: task {task_id[:8]}… status → {status!r}: {result['_error']}"
|
||
)
|
||
|
||
elif issue.check_id == "C-11":
|
||
ws_id = ctx["ws_id"]
|
||
ws_status = ctx.get("ws_status", "")
|
||
task = ctx["task"]
|
||
wp_file = Path(ctx["wp_file"])
|
||
meta = ctx["meta"]
|
||
body = ctx.get("body", "")
|
||
t_id = str(task.get("id", "")).strip()
|
||
# Skip creating tasks for finished workstreams — the workstream is
|
||
# done/archived so unlinked tasks are stale file artefacts, not gaps.
|
||
if normalise_workstream_status(ws_status) in CLOSED_WORKSTREAM_STATUSES:
|
||
report.fixes_applied.append(
|
||
f"C-11 skipped: task '{t_id}' in {ws_status} workstream — not created"
|
||
)
|
||
else:
|
||
t_status = normalise_task_status(task.get("status", "todo"))
|
||
t_priority = str(task.get("priority", "medium")).strip()
|
||
if t_priority not in VALID_TASK_PRIORITIES:
|
||
t_priority = "medium"
|
||
t_data = _api_post(api_base, "/tasks", {
|
||
"workstream_id": ws_id,
|
||
"title": str(task.get("title", t_id)).strip() or t_id,
|
||
"description": task.get("description") or None,
|
||
"status": t_status,
|
||
"priority": t_priority,
|
||
"assignee": task.get("assignee") or None,
|
||
})
|
||
if t_data:
|
||
t_db_id = t_data["id"]
|
||
injected = _inject_task_id_into_block(
|
||
wp_file, "state_hub_task_id", t_db_id, t_id
|
||
)
|
||
if not injected:
|
||
_inject_task_id_frontmatter_list(wp_file, t_db_id, t_id)
|
||
report.fixes_applied.append(
|
||
f"C-11 fixed: task '{t_id}' → {t_db_id[:8]}…"
|
||
)
|
||
|
||
elif issue.check_id == "C-12":
|
||
task_id = ctx["task_id"]
|
||
if ctx.get("ws_finished"):
|
||
result = _api_patch(api_base, f"/tasks/{task_id}", {"status": "cancel"})
|
||
if result is not None:
|
||
report.fixes_applied.append(
|
||
f"C-12 fixed: orphan task {task_id[:8]}… canceled (workstream finished)"
|
||
)
|
||
|
||
elif issue.check_id == "C-22":
|
||
task_id = ctx["task_id"]
|
||
description = ctx["description"]
|
||
result = _api_patch(api_base, f"/tasks/{task_id}", {"description": description})
|
||
if result is not None and "_error" not in result:
|
||
report.fixes_applied.append(
|
||
f"C-22 fixed: task {task_id[:8]}… description updated"
|
||
)
|
||
elif result is not None:
|
||
report.fixes_applied.append(
|
||
f"C-22 FAILED: task {task_id[:8]}… description update: {result['_error']}"
|
||
)
|
||
|
||
elif issue.check_id == "C-15":
|
||
# T03 — writeback: DB is ahead of file — patch file to match DB.
|
||
if no_writeback:
|
||
report.fixes_applied.append(
|
||
f"C-15 skipped (--no-writeback): task '{ctx['task_block_id']}' "
|
||
f"db={ctx['db_status']!r}"
|
||
)
|
||
else:
|
||
wp_file = Path(ctx["wp_file"])
|
||
task_block_id = ctx["task_block_id"]
|
||
db_status = ctx["db_status"]
|
||
old_status = issue.file_value
|
||
if _patch_task_status_in_file(wp_file, task_block_id, db_status):
|
||
committed = _git_commit_writeback(
|
||
repo_path,
|
||
wp_file,
|
||
[f"{task_block_id}: {old_status} → {db_status}"],
|
||
)
|
||
suffix = " (committed)" if committed else " (file patched, commit failed)"
|
||
report.fixes_applied.append(
|
||
f"C-15 fixed: task '{task_block_id}' "
|
||
f"{old_status} → {db_status}{suffix}"
|
||
)
|
||
else:
|
||
report.fixes_applied.append(
|
||
f"C-15 SKIP: could not locate task block '{task_block_id}' "
|
||
f"in {wp_file.name}"
|
||
)
|
||
|
||
except Exception as e:
|
||
report.fixes_applied.append(f"{issue.check_id} ERROR: {e}")
|
||
|
||
# Record that a sync run happened for this repo
|
||
from datetime import timezone as _tz
|
||
import datetime as _dt
|
||
now_iso = _dt.datetime.now(_tz.utc).isoformat()
|
||
_api_patch(api_base, f"/repos/{repo_slug}", {"last_state_synced_at": now_iso})
|
||
|
||
# Write the worker orientation brief (.custodian-brief.md)
|
||
if repo_path:
|
||
brief_written = _write_custodian_brief(api_base, repo_slug, repo_path)
|
||
if brief_written:
|
||
report.fixes_applied.append("brief: .custodian-brief.md updated")
|
||
|
||
# Push all commits made this run (writebacks + brief) to close the loop.
|
||
# This ensures the next run starts with local == remote, making the
|
||
# timer idempotent and preventing commit pile-up.
|
||
if repo_path:
|
||
push_ok, push_msg = _git_push(repo_path)
|
||
if push_ok:
|
||
report.fixes_applied.append(f"push: {push_msg}")
|
||
else:
|
||
report.fixes_applied.append(f"push WARN: {push_msg}")
|
||
|
||
return report
|
||
|
||
|
||
# Check IDs that are known-background noise in multi-machine setups:
|
||
# C-08 = finished/archived DB workstream with no file (pre-ADR-001 legacy)
|
||
# These alone do not warrant a pull+fix cycle.
|
||
_BACKGROUND_CHECKS: frozenset[str] = frozenset({"C-08"})
|
||
|
||
|
||
def _report_needs_action(
|
||
report: ConsistencyReport, behind_remote: bool, ahead_of_remote: int = 0
|
||
) -> bool:
|
||
"""Return True if the repo warrants a pull+fix cycle.
|
||
|
||
A repo is considered clean (no action needed) when:
|
||
- It is not behind its remote tracking branch, AND
|
||
- It has no unpushed local commits (from a prior failed push), AND
|
||
- It has no FAIL issues, AND
|
||
- Every WARN/INFO issue is in the background-noise set (C-08).
|
||
"""
|
||
if behind_remote or ahead_of_remote > 0:
|
||
return True
|
||
if report.failures:
|
||
return True
|
||
actionable_warns = [
|
||
i for i in report.warnings + report.infos
|
||
if i.check_id not in _BACKGROUND_CHECKS
|
||
]
|
||
return bool(actionable_warns)
|
||
|
||
|
||
def fix_all_remote(
|
||
api_base: str,
|
||
no_writeback: bool = False,
|
||
max_seconds: int = DEFAULT_REMOTE_ALL_MAX_SECONDS,
|
||
) -> list[ConsistencyReport]:
|
||
"""Pull-then-fix all registered repos that need attention.
|
||
|
||
For each repo with a resolvable local path on this machine:
|
||
1. Skip if the path does not exist (repo lives on another machine).
|
||
2. Run check_repo() and detect_behind_remote() — read-only.
|
||
3. If the repo is clean (no actionable issues, not behind remote): skip.
|
||
4. Otherwise: git pull --ff-only, then fix_repo().
|
||
|
||
Returns one ConsistencyReport per repo that was *not* skipped as clean.
|
||
Repos skipped as clean are printed to stdout but not included in the return value.
|
||
"""
|
||
repos = _api_get(api_base, "/repos")
|
||
if not isinstance(repos, list):
|
||
print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr)
|
||
return []
|
||
|
||
started = time.monotonic()
|
||
reports: list[ConsistencyReport] = []
|
||
skipped_clean: list[str] = []
|
||
skipped_missing: list[str] = []
|
||
skipped_budget: list[str] = []
|
||
|
||
for repo in repos:
|
||
slug = repo["slug"]
|
||
if max_seconds > 0 and time.monotonic() - started > max_seconds:
|
||
skipped_budget.append(slug)
|
||
skipped_budget.extend(r.get("slug", "?") for r in repos[repos.index(repo) + 1:])
|
||
break
|
||
|
||
# Resolve path using the same priority as check_repo
|
||
path = resolve_repo_path(repo)
|
||
if not path or not Path(path).is_dir():
|
||
skipped_missing.append(slug)
|
||
continue
|
||
|
||
# Read-only pass: detect issues and remote staleness.
|
||
# _detect_behind_remote does a fetch, so _detect_ahead_of_remote
|
||
# after it sees an up-to-date tracking branch.
|
||
pre_report = check_repo(api_base, slug)
|
||
behind = _detect_behind_remote(path)
|
||
ahead = _detect_ahead_of_remote(path)
|
||
|
||
if not _report_needs_action(pre_report, behind, ahead):
|
||
skipped_clean.append(slug)
|
||
continue
|
||
|
||
# Pull before fixing
|
||
pull_ok, pull_msg = _git_pull(path)
|
||
pull_tag = f"pull: {pull_msg}"
|
||
if not pull_ok:
|
||
pull_tag = f"pull WARN: {pull_msg} — proceeding anyway"
|
||
|
||
report = fix_repo(api_base, slug, no_writeback=no_writeback)
|
||
report.fixes_applied.insert(0, pull_tag)
|
||
reports.append(report)
|
||
|
||
# Print clean/missing summary lines before the detailed reports
|
||
if skipped_clean:
|
||
print(f" CLEAN (skipped): {', '.join(skipped_clean)}")
|
||
if skipped_missing:
|
||
print(f" NOT ON THIS HOST (skipped): {', '.join(skipped_missing)}")
|
||
if skipped_budget:
|
||
print(
|
||
f" BUDGET EXHAUSTED after {max_seconds}s (skipped): "
|
||
f"{', '.join(skipped_budget)}"
|
||
)
|
||
if skipped_clean or skipped_missing or skipped_budget:
|
||
print()
|
||
|
||
return reports
|
||
|
||
|
||
def archive_closed_workplans(
|
||
repo_path: str,
|
||
completion_date: str | None = None,
|
||
workplan: str | None = None,
|
||
) -> list[str]:
|
||
"""Move closed root workplans into workplans/archived/ with YYMMDD prefix.
|
||
|
||
Only root-level files whose frontmatter status normalises to finished or
|
||
archived are moved. Files with any open task blocks are left in place.
|
||
"""
|
||
repo_dir = Path(repo_path)
|
||
workplans_dir = repo_dir / "workplans"
|
||
archived_dir = workplans_dir / "archived"
|
||
if not workplans_dir.is_dir():
|
||
return []
|
||
|
||
date_prefix = completion_date or datetime.now().strftime("%y%m%d")
|
||
archived_dir.mkdir(exist_ok=True)
|
||
moved: list[str] = []
|
||
|
||
for wp_file in sorted(workplans_dir.glob("*.md")):
|
||
text = wp_file.read_text(encoding="utf-8")
|
||
if not text.startswith("---"):
|
||
continue
|
||
meta, body = parse_frontmatter(text)
|
||
if not meta or meta.get("_parse_error"):
|
||
continue
|
||
if workplan:
|
||
wanted = workplan.removesuffix(".md")
|
||
if wanted not in {str(meta.get("id", "")), wp_file.stem, wp_file.name}:
|
||
continue
|
||
status = normalise_workstream_status(str(meta.get("status", "")).strip())
|
||
if status not in CLOSED_WORKSTREAM_STATUSES:
|
||
continue
|
||
tasks = get_tasks_from_workplan(meta, body)
|
||
open_tasks = [
|
||
t for t in tasks
|
||
if normalise_task_status(t.get("status", "todo")) not in TERMINAL_TASK_STATUSES
|
||
]
|
||
if open_tasks:
|
||
continue
|
||
|
||
target = archived_dir / f"{date_prefix}-{canonical_workplan_filename(wp_file)}"
|
||
if target.exists():
|
||
raise FileExistsError(f"Archived workplan already exists: {target}")
|
||
wp_file.rename(target)
|
||
moved.append(f"{wp_file.relative_to(repo_dir)} -> {target.relative_to(repo_dir)}")
|
||
|
||
return moved
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Output / rendering
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def render_renormalization_guide() -> str:
|
||
lines = ["Lifecycle Renormalization Rules", "=" * 33]
|
||
for rule in RENORMALIZATION_RULES:
|
||
lines.extend(
|
||
[
|
||
"",
|
||
f"{rule.check_id}: {rule.invariant}",
|
||
f" Detect: {rule.detection}",
|
||
f" Repair: {rule.repair}",
|
||
f" Tests: {rule.test_anchor}",
|
||
]
|
||
)
|
||
lines.extend(["", "Add The Next Guard"])
|
||
for idx, item in enumerate(RENORMALIZATION_NEXT_GUARD_CHECKLIST, 1):
|
||
lines.append(f" {idx}. {item}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def render_text(report: ConsistencyReport, show_info: bool = True) -> str:
|
||
SEP = "=" * 66
|
||
lines = [
|
||
f"ADR-001 Consistency Report",
|
||
f"Repo: {report.repo_slug}",
|
||
f"Path: {report.repo_path}",
|
||
SEP,
|
||
]
|
||
|
||
for sev in ("FAIL", "WARN", "INFO"):
|
||
section = [i for i in report.issues if i.severity == sev]
|
||
if not section or (sev == "INFO" and not show_info):
|
||
continue
|
||
lines.append(f"\n {sev}S ({len(section)}):")
|
||
for i in section:
|
||
loc = f" [{i.file_path}]" if i.file_path else ""
|
||
fix_tag = " [fixable]" if i.fixable else ""
|
||
lines.append(f" {i.check_id}{loc}{fix_tag}")
|
||
lines.append(f" {i.message}")
|
||
if i.file_value or i.db_value:
|
||
lines.append(f" file={i.file_value!r} db={i.db_value!r}")
|
||
|
||
if report.fixes_applied:
|
||
lines.append(f"\n FIXES APPLIED ({len(report.fixes_applied)}):")
|
||
for f in report.fixes_applied:
|
||
lines.append(f" {f}")
|
||
|
||
lines.append(f"\n{SEP}")
|
||
n_fail = len(report.failures)
|
||
n_warn = len(report.warnings)
|
||
n_info = len(report.infos)
|
||
lines.append(f" {n_fail} fail | {n_warn} warn | {n_info} info")
|
||
if n_fail:
|
||
lines.append(" RESULT: ✗ FAIL")
|
||
elif n_warn:
|
||
lines.append(" RESULT: ✓ PASS (with warnings)")
|
||
else:
|
||
lines.append(" RESULT: ✓ PASS")
|
||
lines.append(SEP)
|
||
return "\n".join(lines)
|
||
|
||
|
||
def report_to_dict(report: ConsistencyReport) -> dict:
|
||
return {
|
||
"repo_slug": report.repo_slug,
|
||
"repo_path": report.repo_path,
|
||
"issues": [
|
||
{
|
||
"severity": i.severity,
|
||
"check_id": i.check_id,
|
||
"message": i.message,
|
||
"file_path": i.file_path,
|
||
"db_id": i.db_id,
|
||
"file_value": i.file_value,
|
||
"db_value": i.db_value,
|
||
"fixable": i.fixable,
|
||
}
|
||
for i in report.issues
|
||
],
|
||
"fixes_applied": report.fixes_applied,
|
||
"summary": {
|
||
"fail": len(report.failures),
|
||
"warn": len(report.warnings),
|
||
"info": len(report.infos),
|
||
},
|
||
"result": (
|
||
"fail" if report.failures else
|
||
"warn" if report.warnings else
|
||
"pass"
|
||
),
|
||
}
|
||
|
||
|
||
def consistency_exit_code(reports: list[ConsistencyReport], *, remote_all: bool = False) -> int:
|
||
"""Return the strict CLI exit code for consistency reports."""
|
||
any_fail = any(r.failures for r in reports)
|
||
any_warn = any(r.warnings for r in reports)
|
||
if remote_all and not any_fail:
|
||
return 0
|
||
return 1 if any_fail else 2 if any_warn else 0
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CLI entry point
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(
|
||
description="ADR-001 consistency checker — bidirectional file↔DB validation",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog=__doc__,
|
||
)
|
||
group = parser.add_mutually_exclusive_group(required=True)
|
||
group.add_argument("--repo", metavar="SLUG",
|
||
help="Registered repo slug (e.g. the-custodian)")
|
||
group.add_argument("--all", action="store_true",
|
||
help="Run checks against all repos with a resolvable path on this host")
|
||
group.add_argument("--here", metavar="PATH", nargs="?", const="",
|
||
help="Infer repo slug from git remote URL at PATH (default: CWD)")
|
||
group.add_argument("--renormalization-guide", action="store_true",
|
||
help="Print lifecycle renormalization rules and the add-next-guard checklist")
|
||
parser.add_argument("--fix", action="store_true",
|
||
help="Apply auto-fixable issues (status drift, repo mismatch, etc.)")
|
||
parser.add_argument("--remote", action="store_true",
|
||
help="Pull each repo before fixing; when used with --all, skips repos "
|
||
"that are already clean (no actionable issues, not behind remote). "
|
||
"Implies --fix.")
|
||
parser.add_argument("--max-seconds", type=int, default=DEFAULT_REMOTE_ALL_MAX_SECONDS,
|
||
help="Wall-clock budget for --remote --all before remaining repos are skipped "
|
||
f"(default: {DEFAULT_REMOTE_ALL_MAX_SECONDS}; 0 disables)")
|
||
parser.add_argument("--no-writeback", action="store_true", dest="no_writeback",
|
||
help="Disable DB→file status writeback (C-15) while keeping other fixes")
|
||
parser.add_argument("--archive-closed", action="store_true",
|
||
help="Move closed root workplans to workplans/archived/YYMMDD-*.md")
|
||
parser.add_argument("--archive-workplan", metavar="ID_OR_FILE", default=None,
|
||
help="When archiving, only move the matching workplan id or filename")
|
||
parser.add_argument("--archive-date", metavar="YYMMDD", default=None,
|
||
help="Completion date prefix for --archive-closed (default: today)")
|
||
parser.add_argument("--repo-path", metavar="PATH", default=None,
|
||
help="Override the local repo path (useful when the DB has a different "
|
||
"machine's path). Takes priority over host_paths and local_path.")
|
||
parser.add_argument("--api-base", default="http://127.0.0.1:8000",
|
||
help="State Hub API base URL")
|
||
parser.add_argument("--json", action="store_true", dest="as_json",
|
||
help="Output JSON instead of human-readable text")
|
||
args = parser.parse_args()
|
||
|
||
if args.renormalization_guide:
|
||
if args.as_json:
|
||
print(json.dumps({
|
||
"rules": [
|
||
{
|
||
"check_id": rule.check_id,
|
||
"invariant": rule.invariant,
|
||
"detection": rule.detection,
|
||
"repair": rule.repair,
|
||
"test_anchor": rule.test_anchor,
|
||
}
|
||
for rule in RENORMALIZATION_RULES
|
||
],
|
||
"add_next_guard": list(RENORMALIZATION_NEXT_GUARD_CHECKLIST),
|
||
}, indent=2))
|
||
else:
|
||
print(render_renormalization_guide())
|
||
sys.exit(0)
|
||
|
||
import os as _os
|
||
no_wb = getattr(args, "no_writeback", False)
|
||
do_fix = args.fix or args.remote
|
||
|
||
# --here: infer slug from git remote URL, then run as single-repo check/fix
|
||
if args.here is not None:
|
||
search_path = args.here or _os.getcwd()
|
||
inferred = _infer_slug_from_path(args.api_base, search_path)
|
||
if inferred is None:
|
||
print(
|
||
f"ERROR: No registered repo with a matching remote_url found at '{search_path}'.",
|
||
file=sys.stderr,
|
||
)
|
||
print(
|
||
" Register the repo first: POST /repos/ with remote_url set.",
|
||
file=sys.stderr,
|
||
)
|
||
sys.exit(1)
|
||
inferred_slug, git_root = inferred
|
||
print(f" Detected: {inferred_slug} ({git_root})")
|
||
if do_fix:
|
||
reports = [fix_repo(args.api_base, inferred_slug, git_root, no_writeback=no_wb)]
|
||
else:
|
||
reports = [check_repo(args.api_base, inferred_slug, git_root)]
|
||
if args.archive_closed:
|
||
moved = archive_closed_workplans(git_root, args.archive_date, args.archive_workplan)
|
||
reports[0].fixes_applied.extend(f"archive: {m}" for m in moved)
|
||
# --remote --all: smart pull+fix across all repos
|
||
elif args.remote and args.all:
|
||
with run_lock("consistency-remote-all") as acquired:
|
||
if not acquired:
|
||
print(
|
||
"SKIP: another fix-consistency-remote --all run is already active",
|
||
file=sys.stderr,
|
||
)
|
||
sys.exit(0)
|
||
reports = fix_all_remote(
|
||
args.api_base,
|
||
no_writeback=no_wb,
|
||
max_seconds=args.max_seconds,
|
||
)
|
||
if not reports:
|
||
sys.exit(0)
|
||
else:
|
||
# Resolve repo list
|
||
hostname = socket.gethostname()
|
||
repo_slugs: list[str] = []
|
||
if args.all:
|
||
repos = _api_get(args.api_base, "/repos")
|
||
if not isinstance(repos, list):
|
||
print("ERROR: Could not fetch repos from state-hub API", file=sys.stderr)
|
||
sys.exit(1)
|
||
repo_slugs = [
|
||
r["slug"] for r in repos
|
||
if r.get("local_path") or (r.get("host_paths") or {}).get(hostname)
|
||
]
|
||
if not repo_slugs:
|
||
print(
|
||
f"No repos with a path registered for host '{hostname}'.",
|
||
file=sys.stderr,
|
||
)
|
||
sys.exit(0)
|
||
else:
|
||
repo_slugs = [args.repo]
|
||
|
||
# --repo-path only applies to single-repo runs; silently ignored with --all
|
||
path_override = args.repo_path if not args.all else None
|
||
|
||
if args.remote:
|
||
# Single-repo remote: pull first, then fix
|
||
slug = repo_slugs[0]
|
||
repo = _api_get(args.api_base, f"/repos/{slug}")
|
||
path = resolve_repo_path(repo or {}, path_override) if repo else (path_override or "")
|
||
if path:
|
||
pull_ok, pull_msg = _git_pull(path)
|
||
prefix = "pull" if pull_ok else "pull WARN"
|
||
print(f" {prefix}: {pull_msg}")
|
||
report = fix_repo(args.api_base, slug, path_override, no_writeback=no_wb)
|
||
reports = [report]
|
||
elif do_fix:
|
||
reports = [
|
||
fix_repo(args.api_base, slug, path_override, no_writeback=no_wb)
|
||
for slug in repo_slugs
|
||
]
|
||
else:
|
||
reports = [check_repo(args.api_base, slug, path_override) for slug in repo_slugs]
|
||
|
||
if args.archive_closed:
|
||
for report in reports:
|
||
moved = archive_closed_workplans(report.repo_path, args.archive_date, args.archive_workplan)
|
||
report.fixes_applied.extend(f"archive: {m}" for m in moved)
|
||
|
||
if args.as_json:
|
||
output = (
|
||
report_to_dict(reports[0])
|
||
if len(reports) == 1
|
||
else [report_to_dict(r) for r in reports]
|
||
)
|
||
print(json.dumps(output, indent=2))
|
||
else:
|
||
for report in reports:
|
||
print(render_text(report))
|
||
print()
|
||
|
||
sys.exit(consistency_exit_code(reports, remote_all=args.remote and args.all))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|