Files
the-custodian/state-hub/scripts/ingest_capabilities.py
tegwick f85c5e4d49 feat(capability-requests): add cross-domain capability catalog and request routing
Introduces a capability catalog (CUST-WP-0022) so domains can advertise what
they provide and agents can request capabilities from other domains with
auto-routing, lifecycle tracking, and task-unblocking on completion.

- New models: CapabilityCatalog, CapabilityRequest with full lifecycle
  (requested → accepted → in_progress → ready_for_review → completed/rejected/withdrawn)
- Migration i6d7e8f9a0b1: capability_catalog + capability_requests tables
- Router /capability-catalog and /capability-requests with accept/status endpoints
- 7 new MCP tools: register_capability, list_capabilities, request_capability,
  accept_capability_request, update_capability_request_status,
  list_capability_requests, get_capability_request
- StateSummary gains open_capability_requests count
- Dashboard: capability-requests.md page + docs/capabilities.md + docs/scope.md
- SCOPE.md: three seed capabilities documented (MCP registration, state tracking, SBOM)
- scope.template: Provided Capabilities section with example block
- scripts/ingest_capabilities.py + make ingest-capabilities[/-all] targets

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 21:07:50 +01:00

254 lines
8.9 KiB
Python

#!/usr/bin/env python3
"""Ingest capability declarations from SCOPE.md files into the State Hub catalog.
Reads ``## Provided Capabilities`` sections from SCOPE.md files in registered
repos and upserts them into the capability_catalog table via the API.
Usage:
python ingest_capabilities.py --repo <slug> [--repo-path <path>] [--dry-run]
python ingest_capabilities.py --all [--dry-run]
Capability blocks in SCOPE.md use this format:
```capability
type: infrastructure
title: Cluster provisioning
description: Provision k8s clusters for any domain.
keywords: [cluster, k8s, privacy, instance]
```
Follows ADR-001: SCOPE.md files are the origin of truth; the DB catalog is
a derived index that can be fully reconstructed from repo files.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import socket
import sys
import urllib.error
import urllib.request
from pathlib import Path
try:
import yaml
_YAML_AVAILABLE = True
except ImportError:
_YAML_AVAILABLE = False
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
# ---------------------------------------------------------------------------
# SCOPE.md parser
# ---------------------------------------------------------------------------
_CAPABILITY_BLOCK_RE = re.compile(r"```capability\s*\n(.*?)\n```", re.DOTALL)
def parse_capabilities(scope_path: Path) -> list[dict]:
"""Extract capability blocks from a SCOPE.md file."""
if not scope_path.exists():
return []
text = scope_path.read_text()
blocks = _CAPABILITY_BLOCK_RE.findall(text)
capabilities = []
for block in blocks:
cap = _parse_yaml_block(block)
if cap.get("type") and cap.get("title"):
capabilities.append({
"capability_type": cap["type"],
"title": cap["title"],
"description": cap.get("description", ""),
"keywords": cap.get("keywords", []),
})
return capabilities
def _parse_yaml_block(text: str) -> dict:
"""Parse a YAML-like key: value block. Uses PyYAML if available, falls back to manual."""
if _YAML_AVAILABLE:
try:
result = yaml.safe_load(text)
if isinstance(result, dict):
return result
except yaml.YAMLError:
pass
# Fallback: manual key: value parsing
result = {}
for line in text.strip().splitlines():
line = line.strip()
if not line or ":" not in line:
continue
key, _, val = line.partition(":")
key = key.strip()
val = val.strip()
if val.startswith("[") and val.endswith("]"):
# Parse simple list: [a, b, c]
inner = val[1:-1]
result[key] = [v.strip().strip("'\"") for v in inner.split(",") if v.strip()]
else:
result[key] = val.strip("'\"")
return result
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def _api_get(path: str) -> dict | list | None:
url = f"{API_BASE}{path}"
# Add trailing slash before query params for FastAPI redirect avoidance
if "?" in url:
base, qs = url.split("?", 1)
if not base.endswith("/"):
base += "/"
url = f"{base}?{qs}"
elif not url.endswith("/"):
url += "/"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
except Exception as e:
print(f" GET {url} failed: {e}", file=sys.stderr)
return None
def _api_post(path: str, body: dict) -> dict | None:
url = f"{API_BASE}{path}"
if not url.endswith("/"):
url += "/"
data = json.dumps({k: v for k, v in body.items() if v is not None}).encode()
try:
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
body_text = e.read().decode()[:200]
print(f" POST {url}{e.code}: {body_text}", file=sys.stderr)
return None
except Exception as e:
print(f" POST {url} failed: {e}", file=sys.stderr)
return None
def resolve_repo_path(repo: dict, override: str | None = None) -> str:
if override:
return override
hostname = socket.gethostname()
host_paths = repo.get("host_paths") or {}
return host_paths.get(hostname) or repo.get("local_path") or ""
# ---------------------------------------------------------------------------
# Ingest logic
# ---------------------------------------------------------------------------
def ingest_repo(repo_slug: str, repo_path_override: str | None = None, dry_run: bool = False) -> int:
"""Ingest capabilities from one repo's SCOPE.md. Returns count of capabilities found."""
repo = _api_get(f"/repos/{repo_slug}")
if repo is None:
print(f" Repo '{repo_slug}' not found in state-hub", file=sys.stderr)
return 0
repo_path = resolve_repo_path(repo, repo_path_override)
if not repo_path:
print(f" Repo '{repo_slug}' has no local path on this host", file=sys.stderr)
return 0
scope_path = Path(repo_path) / "SCOPE.md"
if not scope_path.exists():
print(f" {repo_slug}: no SCOPE.md at {scope_path}")
return 0
capabilities = parse_capabilities(scope_path)
if not capabilities:
print(f" {repo_slug}: no capability blocks in SCOPE.md")
return 0
# Resolve domain slug for this repo
domain_slug = repo.get("domain_slug")
if not domain_slug:
# Fetch domain from repo's domain_id
domains = _api_get("/domains/") or []
domain_map = {d["id"]: d["slug"] for d in domains if isinstance(d, dict)}
domain_slug = domain_map.get(repo.get("domain_id"), "")
if not domain_slug:
print(f" {repo_slug}: cannot resolve domain slug", file=sys.stderr)
return 0
# Get existing catalog entries for this domain to avoid duplicates
existing = _api_get(f"/capability-catalog/?domain={domain_slug}&status=all") or []
existing_keys = {(e["capability_type"], e["title"]) for e in existing if isinstance(e, dict)}
count = 0
for cap in capabilities:
key = (cap["capability_type"], cap["title"])
if key in existing_keys:
print(f" {repo_slug}: skip (exists) {cap['capability_type']}/{cap['title']}")
continue
if dry_run:
print(f" {repo_slug}: [dry-run] would create {cap['capability_type']}/{cap['title']}")
else:
result = _api_post("/capability-catalog", {
"domain": domain_slug,
"capability_type": cap["capability_type"],
"title": cap["title"],
"description": cap["description"],
"keywords": cap["keywords"],
})
if result:
print(f" {repo_slug}: created {cap['capability_type']}/{cap['title']}{result.get('id', '?')[:8]}")
else:
print(f" {repo_slug}: FAILED to create {cap['capability_type']}/{cap['title']}")
count += 1
return count
def ingest_all(dry_run: bool = False) -> None:
"""Ingest capabilities from all registered repos."""
repos = _api_get("/repos/") or []
total = 0
for repo in repos:
slug = repo.get("slug", "")
if not slug:
continue
print(f"\n[{slug}]")
total += ingest_repo(slug, dry_run=dry_run)
print(f"\nDone. {total} capability entries {'would be ' if dry_run else ''}ingested.")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Ingest capabilities from SCOPE.md into state-hub catalog")
parser.add_argument("--repo", help="Repo slug to ingest")
parser.add_argument("--repo-path", help="Override repo filesystem path")
parser.add_argument("--all", action="store_true", help="Ingest from all registered repos")
parser.add_argument("--dry-run", action="store_true", help="Print what would be ingested without writing")
parser.add_argument("--api-base", help="Override API base URL")
args = parser.parse_args()
if args.api_base:
global API_BASE
API_BASE = args.api_base.rstrip("/")
if args.all:
ingest_all(dry_run=args.dry_run)
elif args.repo:
print(f"[{args.repo}]")
count = ingest_repo(args.repo, repo_path_override=args.repo_path, dry_run=args.dry_run)
print(f"\nDone. {count} capability entries {'would be ' if args.dry_run else ''}ingested.")
else:
parser.error("Specify --repo <slug> or --all")
if __name__ == "__main__":
main()