feat(custodian): add ADR-001 compliance validator

Scripts, Makefile target, and MCP tool for checking a repository
against ADR-001 (workplans as repo artefacts, state-hub as cache).

Checks performed:
  File-side: workplans/ dir exists, valid YAML frontmatter (required
  fields, type, status, id format), filename matches id, embedded
  task blocks have id/status/priority.

  State-hub cross-reference: state_hub_workstream_id references
  resolve to real DB records; orphan detection flags active DB
  workstreams with no backing workplan file.

Usage:
  make validate-adr REPO=<path> [DOMAIN=<slug>]
  validate_repo_adr(repo_path, domain_slug?)  # MCP tool

Running against the-custodian itself correctly surfaces the 4
pre-ADR-001 workstreams that still need workplan files written.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 12:00:09 +01:00
parent 0546a1bb2a
commit c3efb099f1
4 changed files with 548 additions and 1 deletions

View File

@@ -1,4 +1,4 @@
.PHONY: install install-cli db db-tools migrate seed api dashboard check start clean register-project .PHONY: install install-cli db db-tools migrate seed api dashboard check start clean register-project validate-adr
COMPOSE = docker compose -f infra/docker-compose.yml --env-file .env COMPOSE = docker compose -f infra/docker-compose.yml --env-file .env
@@ -45,5 +45,10 @@ register-project:
@test -n "$(PROJECT_PATH)" || (echo "ERROR: PROJECT_PATH is required."; exit 1) @test -n "$(PROJECT_PATH)" || (echo "ERROR: PROJECT_PATH is required."; exit 1)
scripts/register_project.sh "$(DOMAIN)" "$(PROJECT_PATH)" scripts/register_project.sh "$(DOMAIN)" "$(PROJECT_PATH)"
## Check a repo for ADR-001 compliance: make validate-adr REPO=/path/to/repo [DOMAIN=custodian]
validate-adr:
@test -n "$(REPO)" || (echo "ERROR: REPO is required. Usage: make validate-adr REPO=<path> [DOMAIN=<slug>]"; exit 1)
uv run python scripts/validate_repo_adr.py "$(REPO)" $(if $(DOMAIN),--domain "$(DOMAIN)",)
clean: clean:
$(COMPOSE) down -v $(COMPOSE) down -v

View File

@@ -57,6 +57,14 @@ Do not use them as a substitute for formal work definition inside the domain rep
--- ---
## Governance Tools
| Tool | Key Args | When to use |
|------|----------|-------------|
| `validate_repo_adr(repo_path, domain_slug?)` | `repo_path`: absolute path; `domain_slug?`: for orphan detection | Check a repo against ADR-001. Detects missing workplans/ dir, invalid frontmatter, stale workstream ID references, and DB-only orphan workstreams. Run before and after any workplan changes. |
---
## Resources (URI-addressable, read-only) ## Resources (URI-addressable, read-only)
| URI | Returns | | URI | Returns |

View File

@@ -10,6 +10,7 @@ import os
import re import re
import sys import sys
from datetime import datetime from datetime import datetime
from pathlib import Path
from typing import Any from typing import Any
from uuid import UUID from uuid import UUID
@@ -629,6 +630,71 @@ def update_td_status(td_uuid: str, status: str) -> str:
return json.dumps(td, indent=2) return json.dumps(td, indent=2)
# ---------------------------------------------------------------------------
# ADR-001 compliance validation
# ---------------------------------------------------------------------------
@mcp.tool()
def validate_repo_adr(repo_path: str, domain_slug: str | None = None) -> str:
"""Check whether a repository is consistent with ADR-001.
Validates that workplan files exist in workplans/ with correct frontmatter,
that state_hub_workstream_id references resolve to real DB records, and that
no active state-hub workstreams for the domain lack a backing file (orphan
detection — DB-only records are an ADR-001 violation).
Args:
repo_path: Absolute path to the repository root.
domain_slug: Domain slug for orphan detection (e.g. 'custodian').
If omitted, inferred from workplan frontmatter.
"""
import subprocess
script = Path(__file__).parent.parent / "scripts" / "validate_repo_adr.py"
cmd = [sys.executable, str(script), repo_path, "--json",
"--api-base", API_BASE]
if domain_slug:
cmd += ["--domain", domain_slug]
result = subprocess.run(cmd, capture_output=True, text=True)
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return f"Validator script error:\n{result.stderr or result.stdout or '(no output)'}"
findings = data.get("findings", [])
summary = data.get("summary", {})
overall = data.get("result", "unknown")
failures = [f for f in findings if f["level"] == "FAIL"]
warnings = [f for f in findings if f["level"] == "WARN"]
lines = [f"ADR-001 Compliance: {repo_path}", ""]
if failures:
lines.append(f"FAILURES ({len(failures)}):")
for f in failures:
loc = f" [{f['file']}]" if f.get("file") else ""
lines.append(f" FAIL {f['check']}{loc}")
lines.append(f" {f['detail']}")
lines.append("")
if warnings:
lines.append(f"WARNINGS ({len(warnings)}):")
for f in warnings:
loc = f" [{f['file']}]" if f.get("file") else ""
lines.append(f" WARN {f['check']}{loc}")
lines.append(f" {f['detail']}")
lines.append("")
lines.append(
f"Summary: {summary.get('pass', 0)} pass | "
f"{summary.get('warn', 0)} warn | "
f"{summary.get('fail', 0)} fail"
)
lines.append(f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall == 'warn' else 'PASS'}")
return "\n".join(lines)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Entry point # Entry point
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@@ -0,0 +1,468 @@
#!/usr/bin/env python3
"""validate_repo_adr.py — ADR-001 compliance checker.
Checks whether a repository is consistent with ADR-001: workplans and
work items must originate as Markdown files in the native repository;
the state-hub is a read/cache layer, never the origin.
Checks performed:
File-side (no API required):
1. workplans/ directory exists
2. Each .md file has valid YAML frontmatter with required fields
3. type == "workplan", status in valid set, id matches pattern
4. Filename starts with the id value
5. Embedded ```task blocks have id and status fields
State-hub cross-reference (requires API):
6. state_hub_workstream_id references resolve to real DB records
7. Orphan detection: DB workstreams for the domain with no backing file
Usage:
python scripts/validate_repo_adr.py <repo_path> [OPTIONS]
Options:
--domain SLUG Domain slug for orphan detection
--api-base URL State Hub API (default: http://127.0.0.1:8000)
--no-api Skip state-hub consistency checks
--json Output JSON instead of text
Exit codes:
0 — all checks pass (including warnings)
1 — one or more FAIL findings
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
try:
import yaml as _yaml
_HAS_YAML = True
except ImportError:
_HAS_YAML = False
try:
import httpx as _httpx
_HAS_HTTPX = True
except ImportError:
_HAS_HTTPX = False
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
REQUIRED_FRONTMATTER = {"id", "type", "title", "domain", "status", "owner", "created"}
VALID_WP_STATUSES = {"active", "completed", "archived"}
VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"}
VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"}
_WP_ID_RE = re.compile(r"^[A-Z]+-WP-\d+$")
_TASK_ID_RE = re.compile(r"^[A-Z]+-WP-\d+-T\d+$")
_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL)
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
class Level:
PASS = "PASS"
WARN = "WARN"
FAIL = "FAIL"
@dataclass
class Finding:
level: str
check: str
detail: str
file: str = ""
@dataclass
class Report:
repo_path: str
findings: list[Finding] = field(default_factory=list)
def add(self, level: str, check: str, detail: str, file: str = "") -> None:
self.findings.append(Finding(level=level, check=check, detail=detail, file=file))
@property
def failures(self) -> list[Finding]:
return [f for f in self.findings if f.level == Level.FAIL]
@property
def warnings(self) -> list[Finding]:
return [f for f in self.findings if f.level == Level.WARN]
@property
def passes(self) -> list[Finding]:
return [f for f in self.findings if f.level == Level.PASS]
# ---------------------------------------------------------------------------
# Parsing helpers
# ---------------------------------------------------------------------------
def _parse_yaml_block(raw: str) -> dict:
"""Parse a YAML string into a dict, with fallback to simple key:value."""
if _HAS_YAML:
try:
return _yaml.safe_load(raw) or {}
except _yaml.YAMLError:
return {"_parse_error": True}
# Minimal fallback: flat key: value only
result: dict = {}
for line in raw.splitlines():
if ":" in line and not line.startswith(" "):
k, _, v = line.partition(":")
result[k.strip()] = v.strip().strip('"').strip("'")
return result
def parse_frontmatter(text: str) -> tuple[dict, str]:
"""Split YAML frontmatter from body. Returns ({}, text) if no frontmatter."""
if not text.startswith("---"):
return {}, text
parts = text.split("---", 2)
if len(parts) < 3:
return {}, text
meta = _parse_yaml_block(parts[1].strip())
return meta, parts[2]
def parse_task_blocks(body: str) -> list[dict]:
"""Extract all ```task ... ``` YAML blocks from a workplan body."""
return [_parse_yaml_block(m.group(1).strip()) for m in _TASK_BLOCK_RE.finditer(body)]
# ---------------------------------------------------------------------------
# File-side checks
# ---------------------------------------------------------------------------
def _check_workplan_file(wp_file: Path, report: Report) -> dict | None:
"""Validate one workplan file. Returns parsed frontmatter on success."""
fname = wp_file.name
try:
text = wp_file.read_text(encoding="utf-8")
except OSError as e:
report.add(Level.FAIL, "file-readable", str(e), fname)
return None
if not text.startswith("---"):
report.add(Level.FAIL, "frontmatter-present",
"File does not start with '---'; YAML frontmatter required", fname)
return None
meta, body = parse_frontmatter(text)
if not meta or meta.get("_parse_error"):
report.add(Level.FAIL, "frontmatter-parseable",
"YAML frontmatter could not be parsed", fname)
return None
# Required fields
missing = REQUIRED_FRONTMATTER - set(meta.keys())
if missing:
report.add(Level.FAIL, "frontmatter-required-fields",
f"Missing fields: {', '.join(sorted(missing))}", fname)
else:
report.add(Level.PASS, "frontmatter-required-fields",
"All required fields present", fname)
# type
if meta.get("type") != "workplan":
report.add(Level.FAIL, "frontmatter-type",
f"type must be 'workplan', got {meta.get('type')!r}", fname)
else:
report.add(Level.PASS, "frontmatter-type", "type=workplan", fname)
# status
status = str(meta.get("status", ""))
if status not in VALID_WP_STATUSES:
report.add(Level.FAIL, "frontmatter-status",
f"status must be one of {sorted(VALID_WP_STATUSES)}, got {status!r}", fname)
else:
report.add(Level.PASS, "frontmatter-status", f"status={status}", fname)
# id format
wp_id = str(meta.get("id", ""))
if not _WP_ID_RE.match(wp_id):
report.add(Level.FAIL, "frontmatter-id-format",
f"id must match [A-Z]+-WP-\\d+ (e.g. CUST-WP-0001), got {wp_id!r}", fname)
else:
report.add(Level.PASS, "frontmatter-id-format", f"id={wp_id}", fname)
# filename prefix
if wp_id and not fname.startswith(wp_id):
report.add(Level.WARN, "filename-id-prefix",
f"Filename should start with id '{wp_id}', got {fname!r}", fname)
elif wp_id:
report.add(Level.PASS, "filename-id-prefix", "Filename matches id prefix", fname)
# domain non-empty
domain = str(meta.get("domain", "")).strip()
if not domain:
report.add(Level.FAIL, "frontmatter-domain", "domain must be a non-empty string", fname)
else:
report.add(Level.PASS, "frontmatter-domain", f"domain={domain}", fname)
# task blocks
tasks = parse_task_blocks(body)
if not tasks:
report.add(Level.WARN, "tasks-present",
"No ```task blocks found — intentional for a workplan with no tasks?", fname)
else:
report.add(Level.PASS, "tasks-present", f"{len(tasks)} task block(s) found", fname)
for i, task in enumerate(tasks, 1):
tref = f"{fname}#task[{i}]"
if task.get("_parse_error"):
report.add(Level.FAIL, "task-parseable", f"Task block {i} failed to parse", tref)
continue
t_id = str(task.get("id", ""))
if not t_id:
report.add(Level.FAIL, "task-id", "Missing 'id' field", tref)
elif not _TASK_ID_RE.match(t_id):
report.add(Level.WARN, "task-id-format",
f"id {t_id!r} doesn't match [A-Z]+-WP-\\d+-T\\d+", tref)
t_status = str(task.get("status", ""))
if not t_status:
report.add(Level.FAIL, "task-status", "Missing 'status' field", tref)
elif t_status not in VALID_TASK_STATUSES:
report.add(Level.FAIL, "task-status-value",
f"status {t_status!r} not in {sorted(VALID_TASK_STATUSES)}", tref)
t_prio = str(task.get("priority", ""))
if not t_prio:
report.add(Level.WARN, "task-priority", "Missing 'priority' field", tref)
elif t_prio not in VALID_TASK_PRIORITIES:
report.add(Level.WARN, "task-priority-value",
f"priority {t_prio!r} not in {sorted(VALID_TASK_PRIORITIES)}", tref)
return meta
def check_files(workplans_dir: Path, report: Report) -> list[dict]:
"""Check all workplan .md files in workplans_dir."""
md_files = sorted(workplans_dir.glob("*.md"))
if not md_files:
report.add(Level.WARN, "workplans-not-empty",
"workplans/ directory exists but contains no .md files")
return []
metas = []
for wp_file in md_files:
meta = _check_workplan_file(wp_file, report)
if meta:
metas.append(meta)
return metas
# ---------------------------------------------------------------------------
# State-hub API checks
# ---------------------------------------------------------------------------
def _api_get(api_base: str, path: str, params: dict | None = None) -> Any:
if not _HAS_HTTPX:
return None
if not path.endswith("/"):
path += "/"
try:
with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c:
r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None})
r.raise_for_status()
return r.json()
except Exception:
return None
def check_api(api_base: str, metas: list[dict], domain_slug: str | None,
report: Report) -> None:
"""Cross-reference workplan files against the live state-hub database."""
health = _api_get(api_base, "/state/health")
if health is None:
report.add(Level.WARN, "api-reachable",
f"State Hub API not reachable at {api_base} — skipping cross-reference checks")
return
report.add(Level.PASS, "api-reachable", f"State Hub API reachable at {api_base}")
# Verify each state_hub_workstream_id reference
file_ws_ids: set[str] = set()
for meta in metas:
ws_id = str(meta.get("state_hub_workstream_id", "")).strip()
if not ws_id:
report.add(Level.WARN, "workstream-id-present",
f"Workplan {meta.get('id')} has no state_hub_workstream_id "
f"— not indexed in state-hub",
str(meta.get("id", "")))
continue
file_ws_ids.add(ws_id)
ws = _api_get(api_base, f"/workstreams/{ws_id}")
if ws is None:
report.add(Level.FAIL, "workstream-ref-exists",
f"state_hub_workstream_id {ws_id} not found in DB (stale reference)",
str(meta.get("id", "")))
else:
report.add(Level.PASS, "workstream-ref-exists",
f"Workstream {ws_id[:8]}… ({ws.get('slug')}) confirmed in DB",
str(meta.get("id", "")))
# Orphan detection: DB workstreams with no backing file
domains_to_check: set[str] = set()
if domain_slug:
domains_to_check.add(domain_slug)
for meta in metas:
d = str(meta.get("domain", "")).strip()
if d:
domains_to_check.add(d)
if not domains_to_check:
report.add(Level.WARN, "orphan-detection",
"No domain slugs available for orphan detection — pass --domain to enable")
return
topics = _api_get(api_base, "/topics")
if not isinstance(topics, list):
report.add(Level.WARN, "orphan-detection", "Could not fetch topics for orphan detection")
return
for topic in topics:
t_domain = topic.get("domain", "")
if t_domain not in domains_to_check:
continue
t_id = topic["id"]
workstreams = _api_get(api_base, "/workstreams", {"topic_id": t_id})
if not isinstance(workstreams, list):
report.add(Level.WARN, "orphan-detection",
f"Could not fetch workstreams for topic {t_id[:8]}… (domain={t_domain})")
continue
for ws in workstreams:
ws_status = ws.get("status", "")
if ws_status in ("completed", "archived"):
continue
ws_id = ws["id"]
ws_slug = ws.get("slug", "")
if ws_id not in file_ws_ids:
report.add(
Level.FAIL, "orphan-workstream",
f"Active workstream '{ws_slug}' (id={ws_id[:8]}…, domain={t_domain}) "
f"exists in DB but has no backing workplan file — ADR-001 violation",
)
else:
report.add(Level.PASS, "orphan-workstream",
f"Workstream '{ws_slug}' is backed by a workplan file")
# ---------------------------------------------------------------------------
# Top-level runner
# ---------------------------------------------------------------------------
def validate(repo_path: Path, api_base: str = "http://127.0.0.1:8000",
domain_slug: str | None = None, skip_api: bool = False) -> Report:
"""Run all ADR-001 checks for a repository. Returns a Report."""
report = Report(repo_path=str(repo_path))
workplans_dir = repo_path / "workplans"
if not workplans_dir.is_dir():
report.add(Level.FAIL, "workplans-dir",
"No workplans/ directory found. "
"ADR-001 requires workplan files at <repo>/workplans/<ID>-<slug>.md")
return report
report.add(Level.PASS, "workplans-dir", "workplans/ directory exists")
metas = check_files(workplans_dir, report)
if not skip_api:
check_api(api_base, metas, domain_slug, report)
return report
def render_text(report: Report) -> str:
"""Render a Report as human-readable text."""
SEP = "=" * 62
lines = [f"ADR-001 Compliance Report", f"Repo: {report.repo_path}", SEP]
for level in (Level.FAIL, Level.WARN, Level.PASS):
section = [f for f in report.findings if f.level == level]
if not section:
continue
lines.append(f"\n {level}S ({len(section)}):")
for f in section:
loc = f" [{f.file}]" if f.file else ""
lines.append(f" {f.check}{loc}")
lines.append(f" {f.detail}")
lines.append(f"\n{SEP}")
lines.append(
f" {len(report.passes)} pass | "
f"{len(report.warnings)} warn | "
f"{len(report.failures)} fail"
)
if report.failures:
lines.append(" RESULT: ✗ FAIL")
elif report.warnings:
lines.append(" RESULT: ✓ PASS (with warnings)")
else:
lines.append(" RESULT: ✓ PASS")
lines.append(SEP)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="ADR-001 compliance checker for custodian-ecosystem repos",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("repo_path", help="Path to the repository root")
parser.add_argument("--domain", dest="domain_slug", default=None,
help="Domain slug for orphan detection (e.g. custodian)")
parser.add_argument("--api-base", default="http://127.0.0.1:8000",
help="State Hub API base URL")
parser.add_argument("--no-api", action="store_true",
help="Skip state-hub API consistency checks")
parser.add_argument("--json", action="store_true", dest="as_json",
help="Output JSON instead of text")
args = parser.parse_args()
report = validate(
repo_path=Path(args.repo_path).resolve(),
api_base=args.api_base,
domain_slug=args.domain_slug,
skip_api=args.no_api,
)
if args.as_json:
print(json.dumps({
"repo_path": report.repo_path,
"findings": [
{"level": f.level, "check": f.check, "detail": f.detail, "file": f.file}
for f in report.findings
],
"summary": {
"pass": len(report.passes),
"warn": len(report.warnings),
"fail": len(report.failures),
},
"result": "fail" if report.failures else "warn" if report.warnings else "pass",
}, indent=2))
else:
print(render_text(report))
sys.exit(1 if report.failures else 0)
if __name__ == "__main__":
main()