from __future__ import annotations import sys from datetime import date from pathlib import Path from typing import Any import yaml from jsonschema import Draft202012Validator from reuse_surface.registry import ROOT MANIFEST_PATH = ROOT / "registry" / "federation" / "sources.yaml" SCHEMA_PATH = ROOT / "schemas" / "federation.schema.yaml" FEDERATED_INDEX_PATH = ROOT / "registry" / "indexes" / "federated.yaml" def _expand_path(index_path: str) -> Path: return Path(index_path).expanduser() def load_federation_manifest(path: Path | None = None) -> dict[str, Any]: manifest_path = path or MANIFEST_PATH with manifest_path.open(encoding="utf-8") as handle: manifest = yaml.safe_load(handle) schema = yaml.safe_load(SCHEMA_PATH.read_text(encoding="utf-8")) validator = Draft202012Validator(schema) errors = sorted(validator.iter_errors(manifest), key=lambda err: err.path) if errors: messages = "; ".join(error.message for error in errors) raise ValueError(f"invalid federation manifest: {messages}") return manifest def _resolve_index_path(index_value: str) -> Path: path = _expand_path(index_value) if not path.is_absolute(): path = (ROOT / path).resolve() return path def compose_federated_index( manifest: dict[str, Any] | None = None, ) -> tuple[dict[str, Any], list[str]]: manifest = manifest or load_federation_manifest() warnings: list[str] = [] merged: list[dict[str, Any]] = [] seen_ids: dict[str, str] = {} source_summaries: list[dict[str, Any]] = [] for source in manifest["sources"]: if not source.get("enabled", False): continue index_path = _resolve_index_path(source["index"]) if not index_path.exists(): message = f"missing index for {source['repo']}: {index_path}" if source.get("required", False): raise FileNotFoundError(message) warnings.append(message) continue with index_path.open(encoding="utf-8") as handle: index_data = yaml.safe_load(handle) count = 0 for item in index_data.get("capabilities", []): cap_id = item["id"] if cap_id in seen_ids: warnings.append( f"duplicate id {cap_id}: {seen_ids[cap_id]} and {source['repo']}" ) else: seen_ids[cap_id] = source["repo"] federated_item = dict(item) federated_item["source_repo"] = source["repo"] federated_item["source_index"] = source["index"] merged.append(federated_item) count += 1 source_summaries.append( { "repo": source["repo"], "index": source["index"], "count": count, } ) federated = { "version": manifest.get("version", 1), "updated": date.today().isoformat(), "domain": manifest.get("domain"), "collision_policy": manifest.get("collision_policy", "warn"), "sources": source_summaries, "capabilities": sorted(merged, key=lambda item: item["id"]), } return federated, warnings def write_federated_index( output_path: Path | None = None, manifest: dict[str, Any] | None = None, ) -> tuple[Path, list[str]]: federated, warnings = compose_federated_index(manifest) target = output_path or FEDERATED_INDEX_PATH target.parent.mkdir(parents=True, exist_ok=True) header = ( "# Composed federated capability index. Regenerate with:\n" "# reuse-surface federation compose\n" ) target.write_text( header + yaml.safe_dump(federated, sort_keys=False), encoding="utf-8", ) return target, warnings