generated from coulomb/repo-seed
Introduces a capability catalog (CUST-WP-0022) so domains can advertise what they provide and agents can request capabilities from other domains with auto-routing, lifecycle tracking, and task-unblocking on completion. - New models: CapabilityCatalog, CapabilityRequest with full lifecycle (requested → accepted → in_progress → ready_for_review → completed/rejected/withdrawn) - Migration i6d7e8f9a0b1: capability_catalog + capability_requests tables - Router /capability-catalog and /capability-requests with accept/status endpoints - 7 new MCP tools: register_capability, list_capabilities, request_capability, accept_capability_request, update_capability_request_status, list_capability_requests, get_capability_request - StateSummary gains open_capability_requests count - Dashboard: capability-requests.md page + docs/capabilities.md + docs/scope.md - SCOPE.md: three seed capabilities documented (MCP registration, state tracking, SBOM) - scope.template: Provided Capabilities section with example block - scripts/ingest_capabilities.py + make ingest-capabilities[/-all] targets Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
254 lines
8.9 KiB
Python
254 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Ingest capability declarations from SCOPE.md files into the State Hub catalog.
|
|
|
|
Reads ``## Provided Capabilities`` sections from SCOPE.md files in registered
|
|
repos and upserts them into the capability_catalog table via the API.
|
|
|
|
Usage:
|
|
python ingest_capabilities.py --repo <slug> [--repo-path <path>] [--dry-run]
|
|
python ingest_capabilities.py --all [--dry-run]
|
|
|
|
Capability blocks in SCOPE.md use this format:
|
|
|
|
```capability
|
|
type: infrastructure
|
|
title: Cluster provisioning
|
|
description: Provision k8s clusters for any domain.
|
|
keywords: [cluster, k8s, privacy, instance]
|
|
```
|
|
|
|
Follows ADR-001: SCOPE.md files are the origin of truth; the DB catalog is
|
|
a derived index that can be fully reconstructed from repo files.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import socket
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
_YAML_AVAILABLE = True
|
|
except ImportError:
|
|
_YAML_AVAILABLE = False
|
|
|
|
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SCOPE.md parser
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_CAPABILITY_BLOCK_RE = re.compile(r"```capability\s*\n(.*?)\n```", re.DOTALL)
|
|
|
|
|
|
def parse_capabilities(scope_path: Path) -> list[dict]:
|
|
"""Extract capability blocks from a SCOPE.md file."""
|
|
if not scope_path.exists():
|
|
return []
|
|
text = scope_path.read_text()
|
|
blocks = _CAPABILITY_BLOCK_RE.findall(text)
|
|
capabilities = []
|
|
for block in blocks:
|
|
cap = _parse_yaml_block(block)
|
|
if cap.get("type") and cap.get("title"):
|
|
capabilities.append({
|
|
"capability_type": cap["type"],
|
|
"title": cap["title"],
|
|
"description": cap.get("description", ""),
|
|
"keywords": cap.get("keywords", []),
|
|
})
|
|
return capabilities
|
|
|
|
|
|
def _parse_yaml_block(text: str) -> dict:
|
|
"""Parse a YAML-like key: value block. Uses PyYAML if available, falls back to manual."""
|
|
if _YAML_AVAILABLE:
|
|
try:
|
|
result = yaml.safe_load(text)
|
|
if isinstance(result, dict):
|
|
return result
|
|
except yaml.YAMLError:
|
|
pass
|
|
# Fallback: manual key: value parsing
|
|
result = {}
|
|
for line in text.strip().splitlines():
|
|
line = line.strip()
|
|
if not line or ":" not in line:
|
|
continue
|
|
key, _, val = line.partition(":")
|
|
key = key.strip()
|
|
val = val.strip()
|
|
if val.startswith("[") and val.endswith("]"):
|
|
# Parse simple list: [a, b, c]
|
|
inner = val[1:-1]
|
|
result[key] = [v.strip().strip("'\"") for v in inner.split(",") if v.strip()]
|
|
else:
|
|
result[key] = val.strip("'\"")
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# API helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _api_get(path: str) -> dict | list | None:
|
|
url = f"{API_BASE}{path}"
|
|
# Add trailing slash before query params for FastAPI redirect avoidance
|
|
if "?" in url:
|
|
base, qs = url.split("?", 1)
|
|
if not base.endswith("/"):
|
|
base += "/"
|
|
url = f"{base}?{qs}"
|
|
elif not url.endswith("/"):
|
|
url += "/"
|
|
try:
|
|
req = urllib.request.Request(url)
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
return json.loads(resp.read())
|
|
except Exception as e:
|
|
print(f" GET {url} failed: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def _api_post(path: str, body: dict) -> dict | None:
|
|
url = f"{API_BASE}{path}"
|
|
if not url.endswith("/"):
|
|
url += "/"
|
|
data = json.dumps({k: v for k, v in body.items() if v is not None}).encode()
|
|
try:
|
|
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
return json.loads(resp.read())
|
|
except urllib.error.HTTPError as e:
|
|
body_text = e.read().decode()[:200]
|
|
print(f" POST {url} → {e.code}: {body_text}", file=sys.stderr)
|
|
return None
|
|
except Exception as e:
|
|
print(f" POST {url} failed: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def resolve_repo_path(repo: dict, override: str | None = None) -> str:
|
|
if override:
|
|
return override
|
|
hostname = socket.gethostname()
|
|
host_paths = repo.get("host_paths") or {}
|
|
return host_paths.get(hostname) or repo.get("local_path") or ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Ingest logic
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def ingest_repo(repo_slug: str, repo_path_override: str | None = None, dry_run: bool = False) -> int:
|
|
"""Ingest capabilities from one repo's SCOPE.md. Returns count of capabilities found."""
|
|
repo = _api_get(f"/repos/{repo_slug}")
|
|
if repo is None:
|
|
print(f" Repo '{repo_slug}' not found in state-hub", file=sys.stderr)
|
|
return 0
|
|
|
|
repo_path = resolve_repo_path(repo, repo_path_override)
|
|
if not repo_path:
|
|
print(f" Repo '{repo_slug}' has no local path on this host", file=sys.stderr)
|
|
return 0
|
|
|
|
scope_path = Path(repo_path) / "SCOPE.md"
|
|
if not scope_path.exists():
|
|
print(f" {repo_slug}: no SCOPE.md at {scope_path}")
|
|
return 0
|
|
|
|
capabilities = parse_capabilities(scope_path)
|
|
if not capabilities:
|
|
print(f" {repo_slug}: no capability blocks in SCOPE.md")
|
|
return 0
|
|
|
|
# Resolve domain slug for this repo
|
|
domain_slug = repo.get("domain_slug")
|
|
if not domain_slug:
|
|
# Fetch domain from repo's domain_id
|
|
domains = _api_get("/domains/") or []
|
|
domain_map = {d["id"]: d["slug"] for d in domains if isinstance(d, dict)}
|
|
domain_slug = domain_map.get(repo.get("domain_id"), "")
|
|
if not domain_slug:
|
|
print(f" {repo_slug}: cannot resolve domain slug", file=sys.stderr)
|
|
return 0
|
|
|
|
# Get existing catalog entries for this domain to avoid duplicates
|
|
existing = _api_get(f"/capability-catalog/?domain={domain_slug}&status=all") or []
|
|
existing_keys = {(e["capability_type"], e["title"]) for e in existing if isinstance(e, dict)}
|
|
|
|
count = 0
|
|
for cap in capabilities:
|
|
key = (cap["capability_type"], cap["title"])
|
|
if key in existing_keys:
|
|
print(f" {repo_slug}: skip (exists) {cap['capability_type']}/{cap['title']}")
|
|
continue
|
|
|
|
if dry_run:
|
|
print(f" {repo_slug}: [dry-run] would create {cap['capability_type']}/{cap['title']}")
|
|
else:
|
|
result = _api_post("/capability-catalog", {
|
|
"domain": domain_slug,
|
|
"capability_type": cap["capability_type"],
|
|
"title": cap["title"],
|
|
"description": cap["description"],
|
|
"keywords": cap["keywords"],
|
|
})
|
|
if result:
|
|
print(f" {repo_slug}: created {cap['capability_type']}/{cap['title']} → {result.get('id', '?')[:8]}")
|
|
else:
|
|
print(f" {repo_slug}: FAILED to create {cap['capability_type']}/{cap['title']}")
|
|
count += 1
|
|
|
|
return count
|
|
|
|
|
|
def ingest_all(dry_run: bool = False) -> None:
|
|
"""Ingest capabilities from all registered repos."""
|
|
repos = _api_get("/repos/") or []
|
|
total = 0
|
|
for repo in repos:
|
|
slug = repo.get("slug", "")
|
|
if not slug:
|
|
continue
|
|
print(f"\n[{slug}]")
|
|
total += ingest_repo(slug, dry_run=dry_run)
|
|
print(f"\nDone. {total} capability entries {'would be ' if dry_run else ''}ingested.")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Ingest capabilities from SCOPE.md into state-hub catalog")
|
|
parser.add_argument("--repo", help="Repo slug to ingest")
|
|
parser.add_argument("--repo-path", help="Override repo filesystem path")
|
|
parser.add_argument("--all", action="store_true", help="Ingest from all registered repos")
|
|
parser.add_argument("--dry-run", action="store_true", help="Print what would be ingested without writing")
|
|
parser.add_argument("--api-base", help="Override API base URL")
|
|
args = parser.parse_args()
|
|
|
|
if args.api_base:
|
|
global API_BASE
|
|
API_BASE = args.api_base.rstrip("/")
|
|
|
|
if args.all:
|
|
ingest_all(dry_run=args.dry_run)
|
|
elif args.repo:
|
|
print(f"[{args.repo}]")
|
|
count = ingest_repo(args.repo, repo_path_override=args.repo_path, dry_run=args.dry_run)
|
|
print(f"\nDone. {count} capability entries {'would be ' if args.dry_run else ''}ingested.")
|
|
else:
|
|
parser.error("Specify --repo <slug> or --all")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|