generated from coulomb/repo-seed
Some checks failed
ci / validate-registry (push) Has been cancelled
Add hub sync and report cohorts CLI commands with pytest coverage, document sibling index publish contract and hub hardening path, align INTENT layout, raise external evidence on three registry entries, and close gap priorities 19-23 (priority 18 deferred on sibling index blocks).
165 lines
4.9 KiB
Python
165 lines
4.9 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import yaml
|
|
|
|
from reuse_surface.hub_sync import (
|
|
build_manifest,
|
|
load_sources_manifest,
|
|
merge_sources,
|
|
registration_to_source,
|
|
sources_from_hub_payload,
|
|
write_sources_manifest,
|
|
)
|
|
|
|
|
|
def test_registration_to_source_maps_fields():
|
|
source = registration_to_source(
|
|
{
|
|
"repo": "state-hub",
|
|
"url": "https://example.com/capabilities.yaml",
|
|
"enabled": True,
|
|
"required": False,
|
|
"domain": "helix_forge",
|
|
"description": "test",
|
|
"cache_ttl_seconds": 3600,
|
|
"auth_env": "FEDERATION_TOKEN",
|
|
"auth_header": "Authorization",
|
|
}
|
|
)
|
|
assert source["repo"] == "state-hub"
|
|
assert source["url"].endswith("capabilities.yaml")
|
|
assert source["cache_ttl_seconds"] == 3600
|
|
assert source["auth_env"] == "FEDERATION_TOKEN"
|
|
assert "index" not in source
|
|
|
|
|
|
def test_sources_from_hub_payload_skips_disabled_and_missing_url():
|
|
payload = {
|
|
"repos": [
|
|
{
|
|
"repo": "reuse-surface",
|
|
"url": "https://example.com/reuse.yaml",
|
|
"enabled": True,
|
|
},
|
|
{"repo": "disabled", "url": "https://example.com/disabled.yaml", "enabled": False},
|
|
{"repo": "broken", "enabled": True},
|
|
]
|
|
}
|
|
sources = sources_from_hub_payload(payload)
|
|
assert [item["repo"] for item in sources] == ["reuse-surface"]
|
|
|
|
|
|
def test_merge_sources_keeps_local_index_sources():
|
|
hub_sources = [
|
|
{
|
|
"repo": "reuse-surface",
|
|
"url": "https://example.com/reuse.yaml",
|
|
"enabled": True,
|
|
"required": True,
|
|
"domain": "helix_forge",
|
|
}
|
|
]
|
|
existing_sources = [
|
|
{
|
|
"repo": "reuse-surface",
|
|
"index": "registry/indexes/capabilities.yaml",
|
|
"enabled": True,
|
|
"required": True,
|
|
"domain": "helix_forge",
|
|
},
|
|
{
|
|
"repo": "state-hub",
|
|
"index": "~/state-hub/registry/indexes/capabilities.yaml",
|
|
"enabled": False,
|
|
"required": False,
|
|
"domain": "helix_forge",
|
|
},
|
|
]
|
|
merged = merge_sources(hub_sources, existing_sources)
|
|
repos = {item["repo"] for item in merged}
|
|
assert repos == {"reuse-surface", "state-hub"}
|
|
reuse = next(item for item in merged if item["repo"] == "reuse-surface")
|
|
assert "url" in reuse
|
|
state_hub = next(item for item in merged if item["repo"] == "state-hub")
|
|
assert "index" in state_hub
|
|
|
|
|
|
def test_build_manifest_replace_vs_merge():
|
|
payload = {
|
|
"repos": [
|
|
{
|
|
"repo": "reuse-surface",
|
|
"url": "https://example.com/reuse.yaml",
|
|
"enabled": True,
|
|
"required": True,
|
|
"domain": "helix_forge",
|
|
}
|
|
]
|
|
}
|
|
existing = {
|
|
"version": 1,
|
|
"domain": "helix_forge",
|
|
"collision_policy": "warn",
|
|
"sources": [
|
|
{
|
|
"repo": "state-hub",
|
|
"index": "~/state-hub/registry/indexes/capabilities.yaml",
|
|
"enabled": False,
|
|
"required": False,
|
|
"domain": "helix_forge",
|
|
}
|
|
],
|
|
}
|
|
replaced = build_manifest(payload, existing, merge=False)
|
|
assert [item["repo"] for item in replaced["sources"]] == ["reuse-surface"]
|
|
merged = build_manifest(payload, existing, merge=True)
|
|
assert {item["repo"] for item in merged["sources"]} == {
|
|
"reuse-surface",
|
|
"state-hub",
|
|
}
|
|
|
|
|
|
def test_write_sources_manifest_round_trip(tmp_path: Path):
|
|
manifest = {
|
|
"version": 1,
|
|
"domain": "helix_forge",
|
|
"collision_policy": "warn",
|
|
"sources": [
|
|
{
|
|
"repo": "demo",
|
|
"url": "https://example.com/demo.yaml",
|
|
"enabled": True,
|
|
"required": False,
|
|
"domain": "helix_forge",
|
|
}
|
|
],
|
|
}
|
|
path = tmp_path / "sources.yaml"
|
|
write_sources_manifest(manifest, path)
|
|
loaded = load_sources_manifest(path)
|
|
assert loaded["sources"][0]["repo"] == "demo"
|
|
assert yaml.safe_load(path.read_text(encoding="utf-8")) == loaded
|
|
|
|
|
|
def test_cmd_hub_sync_dry_run(tmp_path, monkeypatch):
|
|
from reuse_surface.cli import main
|
|
|
|
monkeypatch.setenv("REUSE_SURFACE_URL", "https://hub.example")
|
|
payload = {
|
|
"count": 1,
|
|
"repos": [
|
|
{
|
|
"repo": "reuse-surface",
|
|
"url": "https://example.com/reuse.yaml",
|
|
"enabled": True,
|
|
"required": True,
|
|
"domain": "helix_forge",
|
|
}
|
|
],
|
|
}
|
|
with patch("reuse_surface.hub_client.hub_list", return_value=(200, payload)):
|
|
exit_code = main(["hub", "sync", "--dry-run"])
|
|
assert exit_code == 0 |