from __future__ import annotations from pathlib import Path from unittest.mock import patch import yaml from reuse_surface.hub_sync import ( build_manifest, load_sources_manifest, merge_sources, registration_to_source, sources_from_hub_payload, write_sources_manifest, ) def test_registration_to_source_maps_fields(): source = registration_to_source( { "repo": "state-hub", "url": "https://example.com/capabilities.yaml", "enabled": True, "required": False, "domain": "helix_forge", "description": "test", "cache_ttl_seconds": 3600, "auth_env": "FEDERATION_TOKEN", "auth_header": "Authorization", } ) assert source["repo"] == "state-hub" assert source["url"].endswith("capabilities.yaml") assert source["cache_ttl_seconds"] == 3600 assert source["auth_env"] == "FEDERATION_TOKEN" assert "index" not in source def test_sources_from_hub_payload_skips_disabled_and_missing_url(): payload = { "repos": [ { "repo": "reuse-surface", "url": "https://example.com/reuse.yaml", "enabled": True, }, {"repo": "disabled", "url": "https://example.com/disabled.yaml", "enabled": False}, {"repo": "broken", "enabled": True}, ] } sources = sources_from_hub_payload(payload) assert [item["repo"] for item in sources] == ["reuse-surface"] def test_merge_sources_keeps_local_index_sources(): hub_sources = [ { "repo": "reuse-surface", "url": "https://example.com/reuse.yaml", "enabled": True, "required": True, "domain": "helix_forge", } ] existing_sources = [ { "repo": "reuse-surface", "index": "registry/indexes/capabilities.yaml", "enabled": True, "required": True, "domain": "helix_forge", }, { "repo": "state-hub", "index": "~/state-hub/registry/indexes/capabilities.yaml", "enabled": False, "required": False, "domain": "helix_forge", }, ] merged = merge_sources(hub_sources, existing_sources) repos = {item["repo"] for item in merged} assert repos == {"reuse-surface", "state-hub"} reuse = next(item for item in merged if item["repo"] == "reuse-surface") assert "url" in reuse state_hub = next(item for item in merged if item["repo"] == "state-hub") assert "index" in state_hub def test_build_manifest_replace_vs_merge(): payload = { "repos": [ { "repo": "reuse-surface", "url": "https://example.com/reuse.yaml", "enabled": True, "required": True, "domain": "helix_forge", } ] } existing = { "version": 1, "domain": "helix_forge", "collision_policy": "warn", "sources": [ { "repo": "state-hub", "index": "~/state-hub/registry/indexes/capabilities.yaml", "enabled": False, "required": False, "domain": "helix_forge", } ], } replaced = build_manifest(payload, existing, merge=False) assert [item["repo"] for item in replaced["sources"]] == ["reuse-surface"] merged = build_manifest(payload, existing, merge=True) assert {item["repo"] for item in merged["sources"]} == { "reuse-surface", "state-hub", } def test_write_sources_manifest_round_trip(tmp_path: Path): manifest = { "version": 1, "domain": "helix_forge", "collision_policy": "warn", "sources": [ { "repo": "demo", "url": "https://example.com/demo.yaml", "enabled": True, "required": False, "domain": "helix_forge", } ], } path = tmp_path / "sources.yaml" write_sources_manifest(manifest, path) loaded = load_sources_manifest(path) assert loaded["sources"][0]["repo"] == "demo" assert yaml.safe_load(path.read_text(encoding="utf-8")) == loaded def test_cmd_hub_sync_dry_run(tmp_path, monkeypatch): from reuse_surface.cli import main monkeypatch.setenv("REUSE_SURFACE_URL", "https://hub.example") payload = { "count": 1, "repos": [ { "repo": "reuse-surface", "url": "https://example.com/reuse.yaml", "enabled": True, "required": True, "domain": "helix_forge", } ], } with patch("reuse_surface.hub_client.hub_list", return_value=(200, payload)): exit_code = main(["hub", "sync", "--dry-run"]) assert exit_code == 0