generated from coulomb/repo-seed
Some checks failed
ci / validate-registry (push) Has been cancelled
Use reuse.coulomb.social, REUSE_SURFACE_URL/TOKEN env vars, reuse-surface image and reuse-surface-env secret. Replace reuse-surface-hub entrypoint with reuse-surface serve; CLI uses --base-url.
133 lines
3.6 KiB
Python
133 lines
3.6 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
from reuse_surface.hub.app import create_app
|
|
from reuse_surface.hub.store import HubStore
|
|
|
|
REMOTE_INDEX = """
|
|
version: 1
|
|
domain: helix_forge
|
|
updated: "2026-06-15"
|
|
capabilities:
|
|
- id: capability.remote.sample
|
|
name: Remote Sample
|
|
domain: helix_forge
|
|
vector: D2/A0/C0/R0
|
|
owner: example
|
|
path: registry/capabilities/capability.remote.sample.md
|
|
summary: Sample capability from a remote index
|
|
tags: [sample]
|
|
consumption_modes: [planning]
|
|
"""
|
|
|
|
|
|
@pytest.fixture
|
|
def hub_client(tmp_path, monkeypatch):
|
|
db_path = tmp_path / "hub.db"
|
|
cache_dir = tmp_path / "cache"
|
|
monkeypatch.setenv("REUSE_SURFACE_TOKEN", "test-token")
|
|
monkeypatch.setenv("REUSE_SURFACE_DB", str(db_path))
|
|
monkeypatch.setenv("REUSE_SURFACE_CACHE_DIR", str(cache_dir))
|
|
app = create_app()
|
|
with TestClient(app) as client:
|
|
yield client
|
|
|
|
|
|
def test_health(hub_client):
|
|
response = hub_client.get("/health")
|
|
assert response.status_code == 200
|
|
assert response.json()["status"] == "ok"
|
|
|
|
|
|
def test_register_requires_auth(hub_client):
|
|
response = hub_client.post(
|
|
"/v1/repos",
|
|
json={
|
|
"repo": "demo",
|
|
"url": "https://example.com/capabilities.yaml",
|
|
"domain": "helix_forge",
|
|
},
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_register_and_list(hub_client):
|
|
payload = {
|
|
"repo": "demo",
|
|
"url": "https://example.com/capabilities.yaml",
|
|
"domain": "helix_forge",
|
|
"description": "test",
|
|
}
|
|
response = hub_client.post(
|
|
"/v1/repos",
|
|
json=payload,
|
|
headers={"Authorization": "Bearer test-token"},
|
|
)
|
|
assert response.status_code == 201
|
|
listed = hub_client.get("/v1/repos")
|
|
assert listed.status_code == 200
|
|
assert listed.json()["count"] == 1
|
|
assert "auth_env" not in listed.json()["repos"][0]
|
|
|
|
|
|
def test_update_registration(hub_client):
|
|
hub_client.post(
|
|
"/v1/repos",
|
|
json={
|
|
"repo": "demo",
|
|
"url": "https://example.com/capabilities.yaml",
|
|
"domain": "helix_forge",
|
|
},
|
|
headers={"Authorization": "Bearer test-token"},
|
|
)
|
|
response = hub_client.patch(
|
|
"/v1/repos/demo",
|
|
json={"enabled": False},
|
|
headers={"Authorization": "Bearer test-token"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["enabled"] is False
|
|
|
|
|
|
def test_compose_federated_with_mock_fetch(hub_client, monkeypatch):
|
|
hub_client.post(
|
|
"/v1/repos",
|
|
json={
|
|
"repo": "remote-repo",
|
|
"url": "https://example.com/capabilities.yaml",
|
|
"domain": "helix_forge",
|
|
"enabled": True,
|
|
},
|
|
headers={"Authorization": "Bearer test-token"},
|
|
)
|
|
payload = REMOTE_INDEX.encode("utf-8")
|
|
|
|
class FakeResponse:
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
return False
|
|
|
|
def read(self):
|
|
return payload
|
|
|
|
with patch("urllib.request.urlopen", return_value=FakeResponse()):
|
|
response = hub_client.get("/v1/federated?refresh=true")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
ids = {item["id"] for item in data["capabilities"]}
|
|
assert "capability.remote.sample" in ids
|
|
|
|
|
|
def test_store_validation(tmp_path):
|
|
store = HubStore(tmp_path / "hub.db")
|
|
with pytest.raises(ValueError):
|
|
store.create_repo({"repo": "BAD", "url": "ftp://x", "domain": "helix_forge"}) |