diff --git a/state-hub/Makefile b/state-hub/Makefile index 3a5ce65..77f1630 100644 --- a/state-hub/Makefile +++ b/state-hub/Makefile @@ -1,4 +1,4 @@ -.PHONY: install install-cli db db-tools migrate seed api dashboard check start clean register-project +.PHONY: install install-cli db db-tools migrate seed api dashboard check start clean register-project validate-adr COMPOSE = docker compose -f infra/docker-compose.yml --env-file .env @@ -45,5 +45,10 @@ register-project: @test -n "$(PROJECT_PATH)" || (echo "ERROR: PROJECT_PATH is required."; exit 1) scripts/register_project.sh "$(DOMAIN)" "$(PROJECT_PATH)" +## Check a repo for ADR-001 compliance: make validate-adr REPO=/path/to/repo [DOMAIN=custodian] +validate-adr: + @test -n "$(REPO)" || (echo "ERROR: REPO is required. Usage: make validate-adr REPO= [DOMAIN=]"; exit 1) + uv run python scripts/validate_repo_adr.py "$(REPO)" $(if $(DOMAIN),--domain "$(DOMAIN)",) + clean: $(COMPOSE) down -v diff --git a/state-hub/mcp_server/TOOLS.md b/state-hub/mcp_server/TOOLS.md index 6c3b830..f1b2900 100644 --- a/state-hub/mcp_server/TOOLS.md +++ b/state-hub/mcp_server/TOOLS.md @@ -57,6 +57,14 @@ Do not use them as a substitute for formal work definition inside the domain rep --- +## Governance Tools + +| Tool | Key Args | When to use | +|------|----------|-------------| +| `validate_repo_adr(repo_path, domain_slug?)` | `repo_path`: absolute path; `domain_slug?`: for orphan detection | Check a repo against ADR-001. Detects missing workplans/ dir, invalid frontmatter, stale workstream ID references, and DB-only orphan workstreams. Run before and after any workplan changes. | + +--- + ## Resources (URI-addressable, read-only) | URI | Returns | diff --git a/state-hub/mcp_server/server.py b/state-hub/mcp_server/server.py index c491e38..79118d9 100644 --- a/state-hub/mcp_server/server.py +++ b/state-hub/mcp_server/server.py @@ -10,6 +10,7 @@ import os import re import sys from datetime import datetime +from pathlib import Path from typing import Any from uuid import UUID @@ -629,6 +630,71 @@ def update_td_status(td_uuid: str, status: str) -> str: return json.dumps(td, indent=2) +# --------------------------------------------------------------------------- +# ADR-001 compliance validation +# --------------------------------------------------------------------------- + +@mcp.tool() +def validate_repo_adr(repo_path: str, domain_slug: str | None = None) -> str: + """Check whether a repository is consistent with ADR-001. + + Validates that workplan files exist in workplans/ with correct frontmatter, + that state_hub_workstream_id references resolve to real DB records, and that + no active state-hub workstreams for the domain lack a backing file (orphan + detection — DB-only records are an ADR-001 violation). + + Args: + repo_path: Absolute path to the repository root. + domain_slug: Domain slug for orphan detection (e.g. 'custodian'). + If omitted, inferred from workplan frontmatter. + """ + import subprocess + script = Path(__file__).parent.parent / "scripts" / "validate_repo_adr.py" + cmd = [sys.executable, str(script), repo_path, "--json", + "--api-base", API_BASE] + if domain_slug: + cmd += ["--domain", domain_slug] + + result = subprocess.run(cmd, capture_output=True, text=True) + try: + data = json.loads(result.stdout) + except json.JSONDecodeError: + return f"Validator script error:\n{result.stderr or result.stdout or '(no output)'}" + + findings = data.get("findings", []) + summary = data.get("summary", {}) + overall = data.get("result", "unknown") + + failures = [f for f in findings if f["level"] == "FAIL"] + warnings = [f for f in findings if f["level"] == "WARN"] + + lines = [f"ADR-001 Compliance: {repo_path}", ""] + + if failures: + lines.append(f"FAILURES ({len(failures)}):") + for f in failures: + loc = f" [{f['file']}]" if f.get("file") else "" + lines.append(f" FAIL {f['check']}{loc}") + lines.append(f" {f['detail']}") + lines.append("") + + if warnings: + lines.append(f"WARNINGS ({len(warnings)}):") + for f in warnings: + loc = f" [{f['file']}]" if f.get("file") else "" + lines.append(f" WARN {f['check']}{loc}") + lines.append(f" {f['detail']}") + lines.append("") + + lines.append( + f"Summary: {summary.get('pass', 0)} pass | " + f"{summary.get('warn', 0)} warn | " + f"{summary.get('fail', 0)} fail" + ) + lines.append(f"Result: {'FAIL' if overall == 'fail' else 'PASS (with warnings)' if overall == 'warn' else 'PASS'}") + return "\n".join(lines) + + # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- diff --git a/state-hub/scripts/validate_repo_adr.py b/state-hub/scripts/validate_repo_adr.py new file mode 100644 index 0000000..64e0b1f --- /dev/null +++ b/state-hub/scripts/validate_repo_adr.py @@ -0,0 +1,468 @@ +#!/usr/bin/env python3 +"""validate_repo_adr.py — ADR-001 compliance checker. + +Checks whether a repository is consistent with ADR-001: workplans and +work items must originate as Markdown files in the native repository; +the state-hub is a read/cache layer, never the origin. + +Checks performed: + File-side (no API required): + 1. workplans/ directory exists + 2. Each .md file has valid YAML frontmatter with required fields + 3. type == "workplan", status in valid set, id matches pattern + 4. Filename starts with the id value + 5. Embedded ```task blocks have id and status fields + + State-hub cross-reference (requires API): + 6. state_hub_workstream_id references resolve to real DB records + 7. Orphan detection: DB workstreams for the domain with no backing file + +Usage: + python scripts/validate_repo_adr.py [OPTIONS] + + Options: + --domain SLUG Domain slug for orphan detection + --api-base URL State Hub API (default: http://127.0.0.1:8000) + --no-api Skip state-hub consistency checks + --json Output JSON instead of text + +Exit codes: + 0 — all checks pass (including warnings) + 1 — one or more FAIL findings +""" +from __future__ import annotations + +import argparse +import json +import re +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +try: + import yaml as _yaml + _HAS_YAML = True +except ImportError: + _HAS_YAML = False + +try: + import httpx as _httpx + _HAS_HTTPX = True +except ImportError: + _HAS_HTTPX = False + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +REQUIRED_FRONTMATTER = {"id", "type", "title", "domain", "status", "owner", "created"} +VALID_WP_STATUSES = {"active", "completed", "archived"} +VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"} +VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"} + +_WP_ID_RE = re.compile(r"^[A-Z]+-WP-\d+$") +_TASK_ID_RE = re.compile(r"^[A-Z]+-WP-\d+-T\d+$") +_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL) + + +# --------------------------------------------------------------------------- +# Data types +# --------------------------------------------------------------------------- + +class Level: + PASS = "PASS" + WARN = "WARN" + FAIL = "FAIL" + + +@dataclass +class Finding: + level: str + check: str + detail: str + file: str = "" + + +@dataclass +class Report: + repo_path: str + findings: list[Finding] = field(default_factory=list) + + def add(self, level: str, check: str, detail: str, file: str = "") -> None: + self.findings.append(Finding(level=level, check=check, detail=detail, file=file)) + + @property + def failures(self) -> list[Finding]: + return [f for f in self.findings if f.level == Level.FAIL] + + @property + def warnings(self) -> list[Finding]: + return [f for f in self.findings if f.level == Level.WARN] + + @property + def passes(self) -> list[Finding]: + return [f for f in self.findings if f.level == Level.PASS] + + +# --------------------------------------------------------------------------- +# Parsing helpers +# --------------------------------------------------------------------------- + +def _parse_yaml_block(raw: str) -> dict: + """Parse a YAML string into a dict, with fallback to simple key:value.""" + if _HAS_YAML: + try: + return _yaml.safe_load(raw) or {} + except _yaml.YAMLError: + return {"_parse_error": True} + # Minimal fallback: flat key: value only + result: dict = {} + for line in raw.splitlines(): + if ":" in line and not line.startswith(" "): + k, _, v = line.partition(":") + result[k.strip()] = v.strip().strip('"').strip("'") + return result + + +def parse_frontmatter(text: str) -> tuple[dict, str]: + """Split YAML frontmatter from body. Returns ({}, text) if no frontmatter.""" + if not text.startswith("---"): + return {}, text + parts = text.split("---", 2) + if len(parts) < 3: + return {}, text + meta = _parse_yaml_block(parts[1].strip()) + return meta, parts[2] + + +def parse_task_blocks(body: str) -> list[dict]: + """Extract all ```task ... ``` YAML blocks from a workplan body.""" + return [_parse_yaml_block(m.group(1).strip()) for m in _TASK_BLOCK_RE.finditer(body)] + + +# --------------------------------------------------------------------------- +# File-side checks +# --------------------------------------------------------------------------- + +def _check_workplan_file(wp_file: Path, report: Report) -> dict | None: + """Validate one workplan file. Returns parsed frontmatter on success.""" + fname = wp_file.name + try: + text = wp_file.read_text(encoding="utf-8") + except OSError as e: + report.add(Level.FAIL, "file-readable", str(e), fname) + return None + + if not text.startswith("---"): + report.add(Level.FAIL, "frontmatter-present", + "File does not start with '---'; YAML frontmatter required", fname) + return None + + meta, body = parse_frontmatter(text) + if not meta or meta.get("_parse_error"): + report.add(Level.FAIL, "frontmatter-parseable", + "YAML frontmatter could not be parsed", fname) + return None + + # Required fields + missing = REQUIRED_FRONTMATTER - set(meta.keys()) + if missing: + report.add(Level.FAIL, "frontmatter-required-fields", + f"Missing fields: {', '.join(sorted(missing))}", fname) + else: + report.add(Level.PASS, "frontmatter-required-fields", + "All required fields present", fname) + + # type + if meta.get("type") != "workplan": + report.add(Level.FAIL, "frontmatter-type", + f"type must be 'workplan', got {meta.get('type')!r}", fname) + else: + report.add(Level.PASS, "frontmatter-type", "type=workplan", fname) + + # status + status = str(meta.get("status", "")) + if status not in VALID_WP_STATUSES: + report.add(Level.FAIL, "frontmatter-status", + f"status must be one of {sorted(VALID_WP_STATUSES)}, got {status!r}", fname) + else: + report.add(Level.PASS, "frontmatter-status", f"status={status}", fname) + + # id format + wp_id = str(meta.get("id", "")) + if not _WP_ID_RE.match(wp_id): + report.add(Level.FAIL, "frontmatter-id-format", + f"id must match [A-Z]+-WP-\\d+ (e.g. CUST-WP-0001), got {wp_id!r}", fname) + else: + report.add(Level.PASS, "frontmatter-id-format", f"id={wp_id}", fname) + + # filename prefix + if wp_id and not fname.startswith(wp_id): + report.add(Level.WARN, "filename-id-prefix", + f"Filename should start with id '{wp_id}', got {fname!r}", fname) + elif wp_id: + report.add(Level.PASS, "filename-id-prefix", "Filename matches id prefix", fname) + + # domain non-empty + domain = str(meta.get("domain", "")).strip() + if not domain: + report.add(Level.FAIL, "frontmatter-domain", "domain must be a non-empty string", fname) + else: + report.add(Level.PASS, "frontmatter-domain", f"domain={domain}", fname) + + # task blocks + tasks = parse_task_blocks(body) + if not tasks: + report.add(Level.WARN, "tasks-present", + "No ```task blocks found — intentional for a workplan with no tasks?", fname) + else: + report.add(Level.PASS, "tasks-present", f"{len(tasks)} task block(s) found", fname) + + for i, task in enumerate(tasks, 1): + tref = f"{fname}#task[{i}]" + if task.get("_parse_error"): + report.add(Level.FAIL, "task-parseable", f"Task block {i} failed to parse", tref) + continue + + t_id = str(task.get("id", "")) + if not t_id: + report.add(Level.FAIL, "task-id", "Missing 'id' field", tref) + elif not _TASK_ID_RE.match(t_id): + report.add(Level.WARN, "task-id-format", + f"id {t_id!r} doesn't match [A-Z]+-WP-\\d+-T\\d+", tref) + + t_status = str(task.get("status", "")) + if not t_status: + report.add(Level.FAIL, "task-status", "Missing 'status' field", tref) + elif t_status not in VALID_TASK_STATUSES: + report.add(Level.FAIL, "task-status-value", + f"status {t_status!r} not in {sorted(VALID_TASK_STATUSES)}", tref) + + t_prio = str(task.get("priority", "")) + if not t_prio: + report.add(Level.WARN, "task-priority", "Missing 'priority' field", tref) + elif t_prio not in VALID_TASK_PRIORITIES: + report.add(Level.WARN, "task-priority-value", + f"priority {t_prio!r} not in {sorted(VALID_TASK_PRIORITIES)}", tref) + + return meta + + +def check_files(workplans_dir: Path, report: Report) -> list[dict]: + """Check all workplan .md files in workplans_dir.""" + md_files = sorted(workplans_dir.glob("*.md")) + if not md_files: + report.add(Level.WARN, "workplans-not-empty", + "workplans/ directory exists but contains no .md files") + return [] + metas = [] + for wp_file in md_files: + meta = _check_workplan_file(wp_file, report) + if meta: + metas.append(meta) + return metas + + +# --------------------------------------------------------------------------- +# State-hub API checks +# --------------------------------------------------------------------------- + +def _api_get(api_base: str, path: str, params: dict | None = None) -> Any: + if not _HAS_HTTPX: + return None + if not path.endswith("/"): + path += "/" + try: + with _httpx.Client(base_url=api_base, timeout=10.0, follow_redirects=True) as c: + r = c.get(path, params={k: v for k, v in (params or {}).items() if v is not None}) + r.raise_for_status() + return r.json() + except Exception: + return None + + +def check_api(api_base: str, metas: list[dict], domain_slug: str | None, + report: Report) -> None: + """Cross-reference workplan files against the live state-hub database.""" + health = _api_get(api_base, "/state/health") + if health is None: + report.add(Level.WARN, "api-reachable", + f"State Hub API not reachable at {api_base} — skipping cross-reference checks") + return + report.add(Level.PASS, "api-reachable", f"State Hub API reachable at {api_base}") + + # Verify each state_hub_workstream_id reference + file_ws_ids: set[str] = set() + for meta in metas: + ws_id = str(meta.get("state_hub_workstream_id", "")).strip() + if not ws_id: + report.add(Level.WARN, "workstream-id-present", + f"Workplan {meta.get('id')} has no state_hub_workstream_id " + f"— not indexed in state-hub", + str(meta.get("id", ""))) + continue + file_ws_ids.add(ws_id) + ws = _api_get(api_base, f"/workstreams/{ws_id}") + if ws is None: + report.add(Level.FAIL, "workstream-ref-exists", + f"state_hub_workstream_id {ws_id} not found in DB (stale reference)", + str(meta.get("id", ""))) + else: + report.add(Level.PASS, "workstream-ref-exists", + f"Workstream {ws_id[:8]}… ({ws.get('slug')}) confirmed in DB", + str(meta.get("id", ""))) + + # Orphan detection: DB workstreams with no backing file + domains_to_check: set[str] = set() + if domain_slug: + domains_to_check.add(domain_slug) + for meta in metas: + d = str(meta.get("domain", "")).strip() + if d: + domains_to_check.add(d) + + if not domains_to_check: + report.add(Level.WARN, "orphan-detection", + "No domain slugs available for orphan detection — pass --domain to enable") + return + + topics = _api_get(api_base, "/topics") + if not isinstance(topics, list): + report.add(Level.WARN, "orphan-detection", "Could not fetch topics for orphan detection") + return + + for topic in topics: + t_domain = topic.get("domain", "") + if t_domain not in domains_to_check: + continue + t_id = topic["id"] + workstreams = _api_get(api_base, "/workstreams", {"topic_id": t_id}) + if not isinstance(workstreams, list): + report.add(Level.WARN, "orphan-detection", + f"Could not fetch workstreams for topic {t_id[:8]}… (domain={t_domain})") + continue + for ws in workstreams: + ws_status = ws.get("status", "") + if ws_status in ("completed", "archived"): + continue + ws_id = ws["id"] + ws_slug = ws.get("slug", "") + if ws_id not in file_ws_ids: + report.add( + Level.FAIL, "orphan-workstream", + f"Active workstream '{ws_slug}' (id={ws_id[:8]}…, domain={t_domain}) " + f"exists in DB but has no backing workplan file — ADR-001 violation", + ) + else: + report.add(Level.PASS, "orphan-workstream", + f"Workstream '{ws_slug}' is backed by a workplan file") + + +# --------------------------------------------------------------------------- +# Top-level runner +# --------------------------------------------------------------------------- + +def validate(repo_path: Path, api_base: str = "http://127.0.0.1:8000", + domain_slug: str | None = None, skip_api: bool = False) -> Report: + """Run all ADR-001 checks for a repository. Returns a Report.""" + report = Report(repo_path=str(repo_path)) + + workplans_dir = repo_path / "workplans" + if not workplans_dir.is_dir(): + report.add(Level.FAIL, "workplans-dir", + "No workplans/ directory found. " + "ADR-001 requires workplan files at /workplans/-.md") + return report + report.add(Level.PASS, "workplans-dir", "workplans/ directory exists") + + metas = check_files(workplans_dir, report) + + if not skip_api: + check_api(api_base, metas, domain_slug, report) + + return report + + +def render_text(report: Report) -> str: + """Render a Report as human-readable text.""" + SEP = "=" * 62 + lines = [f"ADR-001 Compliance Report", f"Repo: {report.repo_path}", SEP] + + for level in (Level.FAIL, Level.WARN, Level.PASS): + section = [f for f in report.findings if f.level == level] + if not section: + continue + lines.append(f"\n {level}S ({len(section)}):") + for f in section: + loc = f" [{f.file}]" if f.file else "" + lines.append(f" {f.check}{loc}") + lines.append(f" {f.detail}") + + lines.append(f"\n{SEP}") + lines.append( + f" {len(report.passes)} pass | " + f"{len(report.warnings)} warn | " + f"{len(report.failures)} fail" + ) + if report.failures: + lines.append(" RESULT: ✗ FAIL") + elif report.warnings: + lines.append(" RESULT: ✓ PASS (with warnings)") + else: + lines.append(" RESULT: ✓ PASS") + lines.append(SEP) + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser( + description="ADR-001 compliance checker for custodian-ecosystem repos", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument("repo_path", help="Path to the repository root") + parser.add_argument("--domain", dest="domain_slug", default=None, + help="Domain slug for orphan detection (e.g. custodian)") + parser.add_argument("--api-base", default="http://127.0.0.1:8000", + help="State Hub API base URL") + parser.add_argument("--no-api", action="store_true", + help="Skip state-hub API consistency checks") + parser.add_argument("--json", action="store_true", dest="as_json", + help="Output JSON instead of text") + args = parser.parse_args() + + report = validate( + repo_path=Path(args.repo_path).resolve(), + api_base=args.api_base, + domain_slug=args.domain_slug, + skip_api=args.no_api, + ) + + if args.as_json: + print(json.dumps({ + "repo_path": report.repo_path, + "findings": [ + {"level": f.level, "check": f.check, "detail": f.detail, "file": f.file} + for f in report.findings + ], + "summary": { + "pass": len(report.passes), + "warn": len(report.warnings), + "fail": len(report.failures), + }, + "result": "fail" if report.failures else "warn" if report.warnings else "pass", + }, indent=2)) + else: + print(render_text(report)) + + sys.exit(1 if report.failures else 0) + + +if __name__ == "__main__": + main()