Files
state-hub/scripts/validate_repo_adr.py

501 lines
18 KiB
Python

#!/usr/bin/env python3
"""validate_repo_adr.py — ADR-001 compliance checker.
Checks whether a repository is consistent with ADR-001: workplans and
work items must originate as Markdown files in the native repository;
the state-hub is a read/cache layer, never the origin.
Checks performed:
File-side (no API required):
1. workplans/ directory exists
2. Each .md file has valid YAML frontmatter with required fields
3. type == "workplan", status in valid set, id matches pattern
4. Filename starts with the id value
5. Embedded ```task blocks have id and status fields
State-hub cross-reference (requires API):
6. state_hub_workstream_id references resolve to real DB records
7. Orphan detection: DB workstreams for the domain with no backing file
Usage:
python scripts/validate_repo_adr.py <repo_path> [OPTIONS]
Options:
--domain SLUG Domain slug for orphan detection
--api-base URL State Hub API (default: http://127.0.0.1:8000)
--no-api Skip state-hub consistency checks
--json Output JSON instead of text
Exit codes:
0 — all checks pass (including warnings)
1 — one or more FAIL findings
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
_REPO_ROOT = Path(__file__).resolve().parent.parent
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
from api.workplan_status import ( # noqa: E402
CANONICAL_WORKSTREAM_STATUSES,
SUPPORTED_WORKSTREAM_STATUSES,
normalize_workstream_status,
)
try:
import yaml as _yaml
_HAS_YAML = True
except ImportError:
_HAS_YAML = False
try:
import httpx as _httpx
_HAS_HTTPX = True
except ImportError:
_HAS_HTTPX = False
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
REQUIRED_FRONTMATTER = {"id", "type", "title", "domain", "status", "owner", "created"}
VALID_WP_STATUSES = set(CANONICAL_WORKSTREAM_STATUSES)
SUPPORTED_WP_STATUSES = set(SUPPORTED_WORKSTREAM_STATUSES)
VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"}
VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"}
_WP_ID_RE = re.compile(r"^(?:[A-Z]+-WP-\d+|ADHOC-\d{4}-\d{2}-\d{2})$")
_TASK_ID_RE = re.compile(r"^(?:[A-Z]+-WP-\d+|ADHOC-\d{4}-\d{2}-\d{2})-T\d+$")
_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL)
_ARCHIVED_WP_RE = re.compile(r"^\d{6}-(.+\.md)$")
def canonical_workplan_filename(path: Path) -> str:
return _ARCHIVED_WP_RE.sub(r"\1", path.name)
def iter_workplan_files(workplans_dir: Path, include_archived: bool = True) -> list[Path]:
files = sorted(workplans_dir.glob("*.md"))
archived_dir = workplans_dir / "archived"
if include_archived and archived_dir.is_dir():
files.extend(sorted(archived_dir.glob("*.md")))
return files
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
class Level:
PASS = "PASS"
WARN = "WARN"
FAIL = "FAIL"
@dataclass
class Finding:
level: str
check: str
detail: str
file: str = ""
@dataclass
class Report:
repo_path: str
findings: list[Finding] = field(default_factory=list)
def add(self, level: str, check: str, detail: str, file: str = "") -> None:
self.findings.append(Finding(level=level, check=check, detail=detail, file=file))
@property
def failures(self) -> list[Finding]:
return [f for f in self.findings if f.level == Level.FAIL]
@property
def warnings(self) -> list[Finding]:
return [f for f in self.findings if f.level == Level.WARN]
@property
def passes(self) -> list[Finding]:
return [f for f in self.findings if f.level == Level.PASS]
# ---------------------------------------------------------------------------
# Parsing helpers
# ---------------------------------------------------------------------------
def _parse_yaml_block(raw: str) -> dict:
"""Parse a YAML string into a dict, with fallback to simple key:value."""
if _HAS_YAML:
try:
return _yaml.safe_load(raw) or {}
except _yaml.YAMLError:
return {"_parse_error": True}
# Minimal fallback: flat key: value only
result: dict = {}
for line in raw.splitlines():
if ":" in line and not line.startswith(" "):
k, _, v = line.partition(":")
result[k.strip()] = v.strip().strip('"').strip("'")
return result
def parse_frontmatter(text: str) -> tuple[dict, str]:
"""Split YAML frontmatter from body. Returns ({}, text) if no frontmatter."""
if not text.startswith("---"):
return {}, text
parts = text.split("---", 2)
if len(parts) < 3:
return {}, text
meta = _parse_yaml_block(parts[1].strip())
return meta, parts[2]
def parse_task_blocks(body: str) -> list[dict]:
"""Extract all ```task ... ``` YAML blocks from a workplan body."""
return [_parse_yaml_block(m.group(1).strip()) for m in _TASK_BLOCK_RE.finditer(body)]
# ---------------------------------------------------------------------------
# File-side checks
# ---------------------------------------------------------------------------
def _check_workplan_file(wp_file: Path, report: Report) -> dict | None:
"""Validate one workplan file. Returns parsed frontmatter on success."""
fname = str(wp_file.relative_to(Path(report.repo_path)))
canonical_fname = canonical_workplan_filename(wp_file)
try:
text = wp_file.read_text(encoding="utf-8")
except OSError as e:
report.add(Level.FAIL, "file-readable", str(e), fname)
return None
if not text.startswith("---"):
report.add(Level.FAIL, "frontmatter-present",
"File does not start with '---'; YAML frontmatter required", fname)
return None
meta, body = parse_frontmatter(text)
if not meta or meta.get("_parse_error"):
report.add(Level.FAIL, "frontmatter-parseable",
"YAML frontmatter could not be parsed", fname)
return None
# Required fields
missing = REQUIRED_FRONTMATTER - set(meta.keys())
if missing:
report.add(Level.FAIL, "frontmatter-required-fields",
f"Missing fields: {', '.join(sorted(missing))}", fname)
else:
report.add(Level.PASS, "frontmatter-required-fields",
"All required fields present", fname)
# type
if meta.get("type") != "workplan":
report.add(Level.FAIL, "frontmatter-type",
f"type must be 'workplan', got {meta.get('type')!r}", fname)
else:
report.add(Level.PASS, "frontmatter-type", "type=workplan", fname)
# status
status = str(meta.get("status", ""))
if status not in SUPPORTED_WP_STATUSES:
report.add(Level.FAIL, "frontmatter-status",
f"status must be one of {sorted(VALID_WP_STATUSES)} "
f"(legacy aliases accepted: {sorted(SUPPORTED_WP_STATUSES - VALID_WP_STATUSES)}), "
f"got {status!r}", fname)
else:
report.add(Level.PASS, "frontmatter-status",
f"status={normalize_workstream_status(status)}", fname)
# id format
wp_id = str(meta.get("id", ""))
if not _WP_ID_RE.match(wp_id):
report.add(Level.FAIL, "frontmatter-id-format",
f"id must match [A-Z]+-WP-\\d+ (e.g. CUST-WP-0001), got {wp_id!r}", fname)
else:
report.add(Level.PASS, "frontmatter-id-format", f"id={wp_id}", fname)
# filename prefix
if wp_id and not canonical_fname.startswith(wp_id):
report.add(Level.WARN, "filename-id-prefix",
f"Filename should start with id '{wp_id}', got {fname!r}", fname)
elif wp_id:
report.add(Level.PASS, "filename-id-prefix", "Filename matches id prefix", fname)
# domain non-empty
domain = str(meta.get("domain", "")).strip()
if not domain:
report.add(Level.FAIL, "frontmatter-domain", "domain must be a non-empty string", fname)
else:
report.add(Level.PASS, "frontmatter-domain", f"domain={domain}", fname)
# task blocks
tasks = parse_task_blocks(body)
if not tasks:
report.add(Level.WARN, "tasks-present",
"No ```task blocks found — intentional for a workplan with no tasks?", fname)
else:
report.add(Level.PASS, "tasks-present", f"{len(tasks)} task block(s) found", fname)
for i, task in enumerate(tasks, 1):
tref = f"{fname}#task[{i}]"
if task.get("_parse_error"):
report.add(Level.FAIL, "task-parseable", f"Task block {i} failed to parse", tref)
continue
t_id = str(task.get("id", ""))
if not t_id:
report.add(Level.FAIL, "task-id", "Missing 'id' field", tref)
elif not _TASK_ID_RE.match(t_id):
report.add(Level.WARN, "task-id-format",
f"id {t_id!r} doesn't match [A-Z]+-WP-\\d+-T\\d+", tref)
t_status = str(task.get("status", ""))
if not t_status:
report.add(Level.FAIL, "task-status", "Missing 'status' field", tref)
elif t_status not in VALID_TASK_STATUSES:
report.add(Level.FAIL, "task-status-value",
f"status {t_status!r} not in {sorted(VALID_TASK_STATUSES)}", tref)
t_prio = str(task.get("priority", ""))
if not t_prio:
report.add(Level.WARN, "task-priority", "Missing 'priority' field", tref)
elif t_prio not in VALID_TASK_PRIORITIES:
report.add(Level.WARN, "task-priority-value",
f"priority {t_prio!r} not in {sorted(VALID_TASK_PRIORITIES)}", tref)
return meta
def check_files(workplans_dir: Path, report: Report) -> list[dict]:
"""Check all workplan .md files in workplans_dir."""
md_files = iter_workplan_files(workplans_dir)
if not md_files:
report.add(Level.WARN, "workplans-not-empty",
"workplans/ directory exists but contains no .md files")
return []
metas = []
for wp_file in md_files:
meta = _check_workplan_file(wp_file, report)
if meta:
meta["_active_file"] = wp_file.parent == workplans_dir
metas.append(meta)
return metas
# ---------------------------------------------------------------------------
# State-hub API checks
# ---------------------------------------------------------------------------
def _api_get(api_base: str, path: str, params: dict | None = None) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None})
r.raise_for_status()
return r.json()
except Exception:
return None
def check_api(api_base: str, metas: list[dict], domain_slug: str | None,
report: Report) -> None:
"""Cross-reference workplan files against the live state-hub database."""
health = _api_get(api_base, "/state/health")
if health is None:
report.add(Level.WARN, "api-reachable",
f"State Hub API not reachable at {api_base} — skipping cross-reference checks")
return
report.add(Level.PASS, "api-reachable", f"State Hub API reachable at {api_base}")
# Verify each state_hub_workstream_id reference
file_ws_ids: set[str] = set()
active_file_ws_ids: set[str] = set()
for meta in metas:
ws_id = str(meta.get("state_hub_workstream_id", "")).strip()
if not ws_id:
report.add(Level.WARN, "workstream-id-present",
f"Workplan {meta.get('id')} has no state_hub_workstream_id "
f"— not indexed in state-hub",
str(meta.get("id", "")))
continue
file_ws_ids.add(ws_id)
if meta.get("_active_file", True):
active_file_ws_ids.add(ws_id)
ws = _api_get(api_base, f"/workstreams/{ws_id}")
if ws is None:
report.add(Level.FAIL, "workstream-ref-exists",
f"state_hub_workstream_id {ws_id} not found in DB (stale reference)",
str(meta.get("id", "")))
else:
report.add(Level.PASS, "workstream-ref-exists",
f"Workstream {ws_id[:8]}… ({ws.get('slug')}) confirmed in DB",
str(meta.get("id", "")))
# Orphan detection: DB workstreams with no backing file
domains_to_check: set[str] = set()
if domain_slug:
domains_to_check.add(domain_slug)
for meta in metas:
d = str(meta.get("domain", "")).strip()
if d:
domains_to_check.add(d)
if not domains_to_check:
report.add(Level.WARN, "orphan-detection",
"No domain slugs available for orphan detection — pass --domain to enable")
return
topics = _api_get(api_base, "/topics")
if not isinstance(topics, list):
report.add(Level.WARN, "orphan-detection", "Could not fetch topics for orphan detection")
return
for topic in topics:
t_domain = topic.get("domain", "")
if t_domain not in domains_to_check:
continue
t_id = topic["id"]
workstreams = _api_get(api_base, "/workstreams", {"topic_id": t_id})
if not isinstance(workstreams, list):
report.add(Level.WARN, "orphan-detection",
f"Could not fetch workstreams for topic {t_id[:8]}… (domain={t_domain})")
continue
for ws in workstreams:
ws_status = ws.get("status", "")
if normalize_workstream_status(ws_status) in {"finished", "archived"}:
continue
ws_id = ws["id"]
ws_slug = ws.get("slug", "")
if ws_id not in active_file_ws_ids:
report.add(
Level.FAIL, "orphan-workstream",
f"Active workstream '{ws_slug}' (id={ws_id[:8]}…, domain={t_domain}) "
f"exists in DB but has no backing workplan file — ADR-001 violation",
)
else:
report.add(Level.PASS, "orphan-workstream",
f"Workstream '{ws_slug}' is backed by a workplan file")
# ---------------------------------------------------------------------------
# Top-level runner
# ---------------------------------------------------------------------------
def validate(repo_path: Path, api_base: str = "http://127.0.0.1:8000",
domain_slug: str | None = None, skip_api: bool = False) -> Report:
"""Run all ADR-001 checks for a repository. Returns a Report."""
report = Report(repo_path=str(repo_path))
workplans_dir = repo_path / "workplans"
if not workplans_dir.is_dir():
report.add(Level.FAIL, "workplans-dir",
"No workplans/ directory found. "
"ADR-001 requires workplan files at <repo>/workplans/<ID>-<slug>.md")
return report
report.add(Level.PASS, "workplans-dir", "workplans/ directory exists")
metas = check_files(workplans_dir, report)
if not skip_api:
check_api(api_base, metas, domain_slug, report)
return report
def render_text(report: Report) -> str:
"""Render a Report as human-readable text."""
SEP = "=" * 62
lines = [f"ADR-001 Compliance Report", f"Repo: {report.repo_path}", SEP]
for level in (Level.FAIL, Level.WARN, Level.PASS):
section = [f for f in report.findings if f.level == level]
if not section:
continue
lines.append(f"\n {level}S ({len(section)}):")
for f in section:
loc = f" [{f.file}]" if f.file else ""
lines.append(f" {f.check}{loc}")
lines.append(f" {f.detail}")
lines.append(f"\n{SEP}")
lines.append(
f" {len(report.passes)} pass | "
f"{len(report.warnings)} warn | "
f"{len(report.failures)} fail"
)
if report.failures:
lines.append(" RESULT: ✗ FAIL")
elif report.warnings:
lines.append(" RESULT: ✓ PASS (with warnings)")
else:
lines.append(" RESULT: ✓ PASS")
lines.append(SEP)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="ADR-001 compliance checker for custodian-ecosystem repos",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("repo_path", help="Path to the repository root")
parser.add_argument("--domain", dest="domain_slug", default=None,
help="Domain slug for orphan detection (e.g. custodian)")
parser.add_argument("--api-base", default="http://127.0.0.1:8000",
help="State Hub API base URL")
parser.add_argument("--no-api", action="store_true",
help="Skip state-hub API consistency checks")
parser.add_argument("--json", action="store_true", dest="as_json",
help="Output JSON instead of text")
args = parser.parse_args()
report = validate(
repo_path=Path(args.repo_path).resolve(),
api_base=args.api_base,
domain_slug=args.domain_slug,
skip_api=args.no_api,
)
if args.as_json:
print(json.dumps({
"repo_path": report.repo_path,
"findings": [
{"level": f.level, "check": f.check, "detail": f.detail, "file": f.file}
for f in report.findings
],
"summary": {
"pass": len(report.passes),
"warn": len(report.warnings),
"fail": len(report.failures),
},
"result": "fail" if report.failures else "warn" if report.warnings else "pass",
}, indent=2))
else:
print(render_text(report))
sys.exit(1 if report.failures else 0)
if __name__ == "__main__":
main()