Files
reuse-surface/tests/test_hub_sync.py
tegwick 270065ff58
Some checks failed
ci / validate-registry (push) Has been cancelled
Implement REUSE-WP-0012 federation scale and intent alignment
Add hub sync and report cohorts CLI commands with pytest coverage, document
sibling index publish contract and hub hardening path, align INTENT layout,
raise external evidence on three registry entries, and close gap priorities
19-23 (priority 18 deferred on sibling index blocks).
2026-06-16 00:42:50 +02:00

165 lines
4.9 KiB
Python

from __future__ import annotations
from pathlib import Path
from unittest.mock import patch
import yaml
from reuse_surface.hub_sync import (
build_manifest,
load_sources_manifest,
merge_sources,
registration_to_source,
sources_from_hub_payload,
write_sources_manifest,
)
def test_registration_to_source_maps_fields():
source = registration_to_source(
{
"repo": "state-hub",
"url": "https://example.com/capabilities.yaml",
"enabled": True,
"required": False,
"domain": "helix_forge",
"description": "test",
"cache_ttl_seconds": 3600,
"auth_env": "FEDERATION_TOKEN",
"auth_header": "Authorization",
}
)
assert source["repo"] == "state-hub"
assert source["url"].endswith("capabilities.yaml")
assert source["cache_ttl_seconds"] == 3600
assert source["auth_env"] == "FEDERATION_TOKEN"
assert "index" not in source
def test_sources_from_hub_payload_skips_disabled_and_missing_url():
payload = {
"repos": [
{
"repo": "reuse-surface",
"url": "https://example.com/reuse.yaml",
"enabled": True,
},
{"repo": "disabled", "url": "https://example.com/disabled.yaml", "enabled": False},
{"repo": "broken", "enabled": True},
]
}
sources = sources_from_hub_payload(payload)
assert [item["repo"] for item in sources] == ["reuse-surface"]
def test_merge_sources_keeps_local_index_sources():
hub_sources = [
{
"repo": "reuse-surface",
"url": "https://example.com/reuse.yaml",
"enabled": True,
"required": True,
"domain": "helix_forge",
}
]
existing_sources = [
{
"repo": "reuse-surface",
"index": "registry/indexes/capabilities.yaml",
"enabled": True,
"required": True,
"domain": "helix_forge",
},
{
"repo": "state-hub",
"index": "~/state-hub/registry/indexes/capabilities.yaml",
"enabled": False,
"required": False,
"domain": "helix_forge",
},
]
merged = merge_sources(hub_sources, existing_sources)
repos = {item["repo"] for item in merged}
assert repos == {"reuse-surface", "state-hub"}
reuse = next(item for item in merged if item["repo"] == "reuse-surface")
assert "url" in reuse
state_hub = next(item for item in merged if item["repo"] == "state-hub")
assert "index" in state_hub
def test_build_manifest_replace_vs_merge():
payload = {
"repos": [
{
"repo": "reuse-surface",
"url": "https://example.com/reuse.yaml",
"enabled": True,
"required": True,
"domain": "helix_forge",
}
]
}
existing = {
"version": 1,
"domain": "helix_forge",
"collision_policy": "warn",
"sources": [
{
"repo": "state-hub",
"index": "~/state-hub/registry/indexes/capabilities.yaml",
"enabled": False,
"required": False,
"domain": "helix_forge",
}
],
}
replaced = build_manifest(payload, existing, merge=False)
assert [item["repo"] for item in replaced["sources"]] == ["reuse-surface"]
merged = build_manifest(payload, existing, merge=True)
assert {item["repo"] for item in merged["sources"]} == {
"reuse-surface",
"state-hub",
}
def test_write_sources_manifest_round_trip(tmp_path: Path):
manifest = {
"version": 1,
"domain": "helix_forge",
"collision_policy": "warn",
"sources": [
{
"repo": "demo",
"url": "https://example.com/demo.yaml",
"enabled": True,
"required": False,
"domain": "helix_forge",
}
],
}
path = tmp_path / "sources.yaml"
write_sources_manifest(manifest, path)
loaded = load_sources_manifest(path)
assert loaded["sources"][0]["repo"] == "demo"
assert yaml.safe_load(path.read_text(encoding="utf-8")) == loaded
def test_cmd_hub_sync_dry_run(tmp_path, monkeypatch):
from reuse_surface.cli import main
monkeypatch.setenv("REUSE_SURFACE_URL", "https://hub.example")
payload = {
"count": 1,
"repos": [
{
"repo": "reuse-surface",
"url": "https://example.com/reuse.yaml",
"enabled": True,
"required": True,
"domain": "helix_forge",
}
],
}
with patch("reuse_surface.hub_client.hub_list", return_value=(200, payload)):
exit_code = main(["hub", "sync", "--dry-run"])
assert exit_code == 0