generated from coulomb/repo-seed
Introduces TPSC for tracking external service dependencies with GDPR
compliance maturity (CNIL/IAPP CMMI scale), pricing model, ToS, and
data retention information across all repos.
Primary data:
- canon/tpsc/{openai,anthropic,gemini,openrouter}-api.yaml — service definitions
- tpsc.yaml in each repo (llm-connect seeded with 4 services)
State-hub additions:
- Migration j7e8f9a0b1c2: tpsc_catalog + tpsc_snapshots + tpsc_entries
- api/models/tpsc.py, api/schemas/tpsc.py, api/routers/tpsc.py
- /tpsc/catalog/, /tpsc/ingest/, /tpsc/snapshots/, /tpsc/report/gdpr endpoints
- 4 MCP tools: register_service, list_services, ingest_tpsc_tool, get_gdpr_report
- scripts/ingest_tpsc.py + make ingest-tpsc[/-all] targets
- Dashboard: tpsc.md page + docs/tpsc.md
GDPR maturity scale: unknown | non_compliant | initial | developing | defined | managed | certified
Warnings triggered at: unknown, non_compliant, initial
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
146 lines
4.2 KiB
Python
146 lines
4.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Ingest tpsc.yaml service dependency declarations into the State Hub.
|
|
|
|
Usage:
|
|
uv run python scripts/ingest_tpsc.py --repo <slug> [--dry-run]
|
|
uv run python scripts/ingest_tpsc.py --all [--dry-run]
|
|
"""
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
import tomllib as _t # noqa — fallback not really viable; yaml is required
|
|
yaml = None
|
|
|
|
API_BASE = "http://127.0.0.1:8000"
|
|
TPSC_FILENAME = "tpsc.yaml"
|
|
|
|
|
|
def _get(path: str) -> dict | list:
|
|
req = urllib.request.Request(f"{API_BASE}{path}", headers={"Accept": "application/json"})
|
|
with urllib.request.urlopen(req) as r:
|
|
return json.loads(r.read())
|
|
|
|
|
|
def _post(path: str, payload: dict) -> dict:
|
|
data = json.dumps(payload).encode()
|
|
req = urllib.request.Request(
|
|
f"{API_BASE}{path}/",
|
|
data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req) as r:
|
|
return json.loads(r.read())
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode()
|
|
print(f" ERROR {e.code}: {body}", file=sys.stderr)
|
|
raise
|
|
|
|
|
|
def _load_yaml(path: Path) -> dict:
|
|
if yaml is None:
|
|
raise RuntimeError("PyYAML is required: uv add pyyaml")
|
|
with open(path) as f:
|
|
return yaml.safe_load(f) or {}
|
|
|
|
|
|
def _resolve_repo_path(repo: dict) -> str:
|
|
import socket
|
|
hostname = socket.gethostname()
|
|
host_paths = repo.get("host_paths") or {}
|
|
candidates = []
|
|
if host_paths.get(hostname):
|
|
candidates.append(host_paths[hostname])
|
|
if repo.get("local_path"):
|
|
candidates.append(repo["local_path"])
|
|
for raw in candidates:
|
|
p = Path(raw).expanduser()
|
|
if p.is_dir():
|
|
return str(p)
|
|
return ""
|
|
|
|
|
|
def ingest_repo(slug: str, dry_run: bool = False) -> bool:
|
|
try:
|
|
repo = _get(f"/repos/{slug}")
|
|
except Exception as e:
|
|
print(f" ✗ Repo '{slug}' not found: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
if isinstance(repo, dict) and repo.get("error"):
|
|
print(f" ✗ {repo['error']}", file=sys.stderr)
|
|
return False
|
|
|
|
repo_path = _resolve_repo_path(repo)
|
|
if not repo_path:
|
|
print(f" ✗ No accessible local path for '{slug}' on this host.", file=sys.stderr)
|
|
return False
|
|
|
|
tpsc_file = Path(repo_path) / TPSC_FILENAME
|
|
if not tpsc_file.exists():
|
|
print(f" — '{slug}': no {TPSC_FILENAME} found, skipping.")
|
|
return True
|
|
|
|
data = _load_yaml(tpsc_file)
|
|
services = data.get("services", [])
|
|
if not services:
|
|
print(f" — '{slug}': {TPSC_FILENAME} has no services entries, skipping.")
|
|
return True
|
|
|
|
entries = [
|
|
{
|
|
"service_slug": svc.get("slug", ""),
|
|
"purpose": svc.get("purpose"),
|
|
"auth_type": svc.get("auth"),
|
|
"endpoint_override": svc.get("endpoint"),
|
|
"notes": svc.get("notes"),
|
|
}
|
|
for svc in services
|
|
if svc.get("slug")
|
|
]
|
|
|
|
print(f" {'[dry-run] ' if dry_run else ''}'{slug}': {len(entries)} service(s) from {TPSC_FILENAME}")
|
|
for e in entries:
|
|
print(f" • {e['service_slug']} ({e.get('auth_type', '?')}) — {e.get('purpose', '')}")
|
|
|
|
if dry_run:
|
|
return True
|
|
|
|
result = _post("/tpsc/ingest", {
|
|
"repo_slug": slug,
|
|
"source_file": TPSC_FILENAME,
|
|
"entries": entries,
|
|
})
|
|
print(f" ✓ Snapshot {result['id'][:8]}… ingested {result['entry_count']} entries")
|
|
return True
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Ingest tpsc.yaml into State Hub")
|
|
group = parser.add_mutually_exclusive_group(required=True)
|
|
group.add_argument("--repo", metavar="SLUG", help="Single repo slug")
|
|
group.add_argument("--all", action="store_true", help="All registered repos")
|
|
parser.add_argument("--dry-run", action="store_true", help="Parse only, do not POST")
|
|
args = parser.parse_args()
|
|
|
|
if args.all:
|
|
repos = _get("/repos/")
|
|
slugs = [r["slug"] for r in repos]
|
|
else:
|
|
slugs = [args.repo]
|
|
|
|
ok = all(ingest_repo(slug, dry_run=args.dry_run) for slug in slugs)
|
|
sys.exit(0 if ok else 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|