#!/usr/bin/env python3 """validate_repo_adr.py — ADR-001 compliance checker. Checks whether a repository is consistent with ADR-001: workplans and work items must originate as Markdown files in the native repository; the state-hub is a read/cache layer, never the origin. Checks performed: File-side (no API required): 1. workplans/ directory exists 2. Each .md file has valid YAML frontmatter with required fields 3. type == "workplan", status in valid set, id matches pattern 4. Filename starts with the id value 5. Embedded ```task blocks have id and status fields State-hub cross-reference (requires API): 6. state_hub_workstream_id references resolve to real DB records 7. Orphan detection: DB workstreams for the domain with no backing file Usage: python scripts/validate_repo_adr.py [OPTIONS] Options: --domain SLUG Domain slug for orphan detection --api-base URL State Hub API (default: http://127.0.0.1:8000) --no-api Skip state-hub consistency checks --json Output JSON instead of text Exit codes: 0 — all checks pass (including warnings) 1 — one or more FAIL findings """ from __future__ import annotations import argparse import json import re import sys from dataclasses import dataclass, field from pathlib import Path from typing import Any try: import yaml as _yaml _HAS_YAML = True except ImportError: _HAS_YAML = False try: import httpx as _httpx _HAS_HTTPX = True except ImportError: _HAS_HTTPX = False # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- REQUIRED_FRONTMATTER = {"id", "type", "title", "domain", "status", "owner", "created"} VALID_WP_STATUSES = {"active", "completed", "archived"} VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"} VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"} _WP_ID_RE = re.compile(r"^[A-Z]+-WP-\d+$") _TASK_ID_RE = re.compile(r"^[A-Z]+-WP-\d+-T\d+$") _TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL) # --------------------------------------------------------------------------- # Data types # --------------------------------------------------------------------------- class Level: PASS = "PASS" WARN = "WARN" FAIL = "FAIL" @dataclass class Finding: level: str check: str detail: str file: str = "" @dataclass class Report: repo_path: str findings: list[Finding] = field(default_factory=list) def add(self, level: str, check: str, detail: str, file: str = "") -> None: self.findings.append(Finding(level=level, check=check, detail=detail, file=file)) @property def failures(self) -> list[Finding]: return [f for f in self.findings if f.level == Level.FAIL] @property def warnings(self) -> list[Finding]: return [f for f in self.findings if f.level == Level.WARN] @property def passes(self) -> list[Finding]: return [f for f in self.findings if f.level == Level.PASS] # --------------------------------------------------------------------------- # Parsing helpers # --------------------------------------------------------------------------- def _parse_yaml_block(raw: str) -> dict: """Parse a YAML string into a dict, with fallback to simple key:value.""" if _HAS_YAML: try: return _yaml.safe_load(raw) or {} except _yaml.YAMLError: return {"_parse_error": True} # Minimal fallback: flat key: value only result: dict = {} for line in raw.splitlines(): if ":" in line and not line.startswith(" "): k, _, v = line.partition(":") result[k.strip()] = v.strip().strip('"').strip("'") return result def parse_frontmatter(text: str) -> tuple[dict, str]: """Split YAML frontmatter from body. Returns ({}, text) if no frontmatter.""" if not text.startswith("---"): return {}, text parts = text.split("---", 2) if len(parts) < 3: return {}, text meta = _parse_yaml_block(parts[1].strip()) return meta, parts[2] def parse_task_blocks(body: str) -> list[dict]: """Extract all ```task ... ``` YAML blocks from a workplan body.""" return [_parse_yaml_block(m.group(1).strip()) for m in _TASK_BLOCK_RE.finditer(body)] # --------------------------------------------------------------------------- # File-side checks # --------------------------------------------------------------------------- def _check_workplan_file(wp_file: Path, report: Report) -> dict | None: """Validate one workplan file. Returns parsed frontmatter on success.""" fname = wp_file.name try: text = wp_file.read_text(encoding="utf-8") except OSError as e: report.add(Level.FAIL, "file-readable", str(e), fname) return None if not text.startswith("---"): report.add(Level.FAIL, "frontmatter-present", "File does not start with '---'; YAML frontmatter required", fname) return None meta, body = parse_frontmatter(text) if not meta or meta.get("_parse_error"): report.add(Level.FAIL, "frontmatter-parseable", "YAML frontmatter could not be parsed", fname) return None # Required fields missing = REQUIRED_FRONTMATTER - set(meta.keys()) if missing: report.add(Level.FAIL, "frontmatter-required-fields", f"Missing fields: {', '.join(sorted(missing))}", fname) else: report.add(Level.PASS, "frontmatter-required-fields", "All required fields present", fname) # type if meta.get("type") != "workplan": report.add(Level.FAIL, "frontmatter-type", f"type must be 'workplan', got {meta.get('type')!r}", fname) else: report.add(Level.PASS, "frontmatter-type", "type=workplan", fname) # status status = str(meta.get("status", "")) if status not in VALID_WP_STATUSES: report.add(Level.FAIL, "frontmatter-status", f"status must be one of {sorted(VALID_WP_STATUSES)}, got {status!r}", fname) else: report.add(Level.PASS, "frontmatter-status", f"status={status}", fname) # id format wp_id = str(meta.get("id", "")) if not _WP_ID_RE.match(wp_id): report.add(Level.FAIL, "frontmatter-id-format", f"id must match [A-Z]+-WP-\\d+ (e.g. CUST-WP-0001), got {wp_id!r}", fname) else: report.add(Level.PASS, "frontmatter-id-format", f"id={wp_id}", fname) # filename prefix if wp_id and not fname.startswith(wp_id): report.add(Level.WARN, "filename-id-prefix", f"Filename should start with id '{wp_id}', got {fname!r}", fname) elif wp_id: report.add(Level.PASS, "filename-id-prefix", "Filename matches id prefix", fname) # domain non-empty domain = str(meta.get("domain", "")).strip() if not domain: report.add(Level.FAIL, "frontmatter-domain", "domain must be a non-empty string", fname) else: report.add(Level.PASS, "frontmatter-domain", f"domain={domain}", fname) # task blocks tasks = parse_task_blocks(body) if not tasks: report.add(Level.WARN, "tasks-present", "No ```task blocks found — intentional for a workplan with no tasks?", fname) else: report.add(Level.PASS, "tasks-present", f"{len(tasks)} task block(s) found", fname) for i, task in enumerate(tasks, 1): tref = f"{fname}#task[{i}]" if task.get("_parse_error"): report.add(Level.FAIL, "task-parseable", f"Task block {i} failed to parse", tref) continue t_id = str(task.get("id", "")) if not t_id: report.add(Level.FAIL, "task-id", "Missing 'id' field", tref) elif not _TASK_ID_RE.match(t_id): report.add(Level.WARN, "task-id-format", f"id {t_id!r} doesn't match [A-Z]+-WP-\\d+-T\\d+", tref) t_status = str(task.get("status", "")) if not t_status: report.add(Level.FAIL, "task-status", "Missing 'status' field", tref) elif t_status not in VALID_TASK_STATUSES: report.add(Level.FAIL, "task-status-value", f"status {t_status!r} not in {sorted(VALID_TASK_STATUSES)}", tref) t_prio = str(task.get("priority", "")) if not t_prio: report.add(Level.WARN, "task-priority", "Missing 'priority' field", tref) elif t_prio not in VALID_TASK_PRIORITIES: report.add(Level.WARN, "task-priority-value", f"priority {t_prio!r} not in {sorted(VALID_TASK_PRIORITIES)}", tref) return meta def check_files(workplans_dir: Path, report: Report) -> list[dict]: """Check all workplan .md files in workplans_dir.""" md_files = sorted(workplans_dir.glob("*.md")) if not md_files: report.add(Level.WARN, "workplans-not-empty", "workplans/ directory exists but contains no .md files") return [] metas = [] for wp_file in md_files: meta = _check_workplan_file(wp_file, report) if meta: metas.append(meta) return metas # --------------------------------------------------------------------------- # State-hub API checks # --------------------------------------------------------------------------- def _api_get(api_base: str, path: str, params: dict | None = None) -> Any: if not _HAS_HTTPX: return None if not path.endswith("/"): path += "/" try: with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c: r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None}) r.raise_for_status() return r.json() except Exception: return None def check_api(api_base: str, metas: list[dict], domain_slug: str | None, report: Report) -> None: """Cross-reference workplan files against the live state-hub database.""" health = _api_get(api_base, "/state/health") if health is None: report.add(Level.WARN, "api-reachable", f"State Hub API not reachable at {api_base} — skipping cross-reference checks") return report.add(Level.PASS, "api-reachable", f"State Hub API reachable at {api_base}") # Verify each state_hub_workstream_id reference file_ws_ids: set[str] = set() for meta in metas: ws_id = str(meta.get("state_hub_workstream_id", "")).strip() if not ws_id: report.add(Level.WARN, "workstream-id-present", f"Workplan {meta.get('id')} has no state_hub_workstream_id " f"— not indexed in state-hub", str(meta.get("id", ""))) continue file_ws_ids.add(ws_id) ws = _api_get(api_base, f"/workstreams/{ws_id}") if ws is None: report.add(Level.FAIL, "workstream-ref-exists", f"state_hub_workstream_id {ws_id} not found in DB (stale reference)", str(meta.get("id", ""))) else: report.add(Level.PASS, "workstream-ref-exists", f"Workstream {ws_id[:8]}… ({ws.get('slug')}) confirmed in DB", str(meta.get("id", ""))) # Orphan detection: DB workstreams with no backing file domains_to_check: set[str] = set() if domain_slug: domains_to_check.add(domain_slug) for meta in metas: d = str(meta.get("domain", "")).strip() if d: domains_to_check.add(d) if not domains_to_check: report.add(Level.WARN, "orphan-detection", "No domain slugs available for orphan detection — pass --domain to enable") return topics = _api_get(api_base, "/topics") if not isinstance(topics, list): report.add(Level.WARN, "orphan-detection", "Could not fetch topics for orphan detection") return for topic in topics: t_domain = topic.get("domain", "") if t_domain not in domains_to_check: continue t_id = topic["id"] workstreams = _api_get(api_base, "/workstreams", {"topic_id": t_id}) if not isinstance(workstreams, list): report.add(Level.WARN, "orphan-detection", f"Could not fetch workstreams for topic {t_id[:8]}… (domain={t_domain})") continue for ws in workstreams: ws_status = ws.get("status", "") if ws_status in ("completed", "archived"): continue ws_id = ws["id"] ws_slug = ws.get("slug", "") if ws_id not in file_ws_ids: report.add( Level.FAIL, "orphan-workstream", f"Active workstream '{ws_slug}' (id={ws_id[:8]}…, domain={t_domain}) " f"exists in DB but has no backing workplan file — ADR-001 violation", ) else: report.add(Level.PASS, "orphan-workstream", f"Workstream '{ws_slug}' is backed by a workplan file") # --------------------------------------------------------------------------- # Top-level runner # --------------------------------------------------------------------------- def validate(repo_path: Path, api_base: str = "http://127.0.0.1:8000", domain_slug: str | None = None, skip_api: bool = False) -> Report: """Run all ADR-001 checks for a repository. Returns a Report.""" report = Report(repo_path=str(repo_path)) workplans_dir = repo_path / "workplans" if not workplans_dir.is_dir(): report.add(Level.FAIL, "workplans-dir", "No workplans/ directory found. " "ADR-001 requires workplan files at /workplans/-.md") return report report.add(Level.PASS, "workplans-dir", "workplans/ directory exists") metas = check_files(workplans_dir, report) if not skip_api: check_api(api_base, metas, domain_slug, report) return report def render_text(report: Report) -> str: """Render a Report as human-readable text.""" SEP = "=" * 62 lines = [f"ADR-001 Compliance Report", f"Repo: {report.repo_path}", SEP] for level in (Level.FAIL, Level.WARN, Level.PASS): section = [f for f in report.findings if f.level == level] if not section: continue lines.append(f"\n {level}S ({len(section)}):") for f in section: loc = f" [{f.file}]" if f.file else "" lines.append(f" {f.check}{loc}") lines.append(f" {f.detail}") lines.append(f"\n{SEP}") lines.append( f" {len(report.passes)} pass | " f"{len(report.warnings)} warn | " f"{len(report.failures)} fail" ) if report.failures: lines.append(" RESULT: ✗ FAIL") elif report.warnings: lines.append(" RESULT: ✓ PASS (with warnings)") else: lines.append(" RESULT: ✓ PASS") lines.append(SEP) return "\n".join(lines) # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser( description="ADR-001 compliance checker for custodian-ecosystem repos", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument("repo_path", help="Path to the repository root") parser.add_argument("--domain", dest="domain_slug", default=None, help="Domain slug for orphan detection (e.g. custodian)") parser.add_argument("--api-base", default="http://127.0.0.1:8000", help="State Hub API base URL") parser.add_argument("--no-api", action="store_true", help="Skip state-hub API consistency checks") parser.add_argument("--json", action="store_true", dest="as_json", help="Output JSON instead of text") args = parser.parse_args() report = validate( repo_path=Path(args.repo_path).resolve(), api_base=args.api_base, domain_slug=args.domain_slug, skip_api=args.no_api, ) if args.as_json: print(json.dumps({ "repo_path": report.repo_path, "findings": [ {"level": f.level, "check": f.check, "detail": f.detail, "file": f.file} for f in report.findings ], "summary": { "pass": len(report.passes), "warn": len(report.warnings), "fail": len(report.failures), }, "result": "fail" if report.failures else "warn" if report.warnings else "pass", }, indent=2)) else: print(render_text(report)) sys.exit(1 if report.failures else 0) if __name__ == "__main__": main()