from __future__ import annotations import os import re import urllib.error import urllib.request from datetime import date, datetime, timezone from pathlib import Path from typing import Any import yaml from jsonschema import Draft202012Validator from reuse_surface.registry import ROOT MANIFEST_PATH = ROOT / "registry" / "federation" / "sources.yaml" SCHEMA_PATH = ROOT / "schemas" / "federation.schema.yaml" FEDERATED_INDEX_PATH = ROOT / "registry" / "indexes" / "federated.yaml" CACHE_DIR = ROOT / "registry" / "federation" / "cache" DEFAULT_CACHE_TTL_SECONDS = 86400 USER_AGENT = "reuse-surface/0.1 federation-compose" def _expand_path(index_path: str) -> Path: return Path(index_path).expanduser() def load_federation_manifest(path: Path | None = None) -> dict[str, Any]: manifest_path = path or MANIFEST_PATH with manifest_path.open(encoding="utf-8") as handle: manifest = yaml.safe_load(handle) schema = yaml.safe_load(SCHEMA_PATH.read_text(encoding="utf-8")) validator = Draft202012Validator(schema) errors = sorted(validator.iter_errors(manifest), key=lambda err: err.path) if errors: messages = "; ".join(error.message for error in errors) raise ValueError(f"invalid federation manifest: {messages}") return manifest def _resolve_index_path(index_value: str) -> Path: path = _expand_path(index_value) if not path.is_absolute(): path = (ROOT / path).resolve() return path def _safe_repo_slug(repo: str) -> str: return re.sub(r"[^\w.-]", "_", repo) def _path_label(path: Path) -> str: try: return str(path.relative_to(ROOT)) except ValueError: return str(path) def _cache_root(cache_dir: Path | None = None) -> Path: return cache_dir or CACHE_DIR def _cache_paths(repo: str, cache_dir: Path | None = None) -> tuple[Path, Path]: root = _cache_root(cache_dir) slug = _safe_repo_slug(repo) return root / f"{slug}.yaml", root / f"{slug}.meta.yaml" def _read_cache_meta(meta_path: Path) -> dict[str, Any] | None: if not meta_path.exists(): return None data = yaml.safe_load(meta_path.read_text(encoding="utf-8")) return data if isinstance(data, dict) else None def _cache_is_fresh(meta: dict[str, Any], ttl_seconds: int) -> bool: if ttl_seconds <= 0: return False fetched_at = meta.get("fetched_at") if not fetched_at: return False fetched = datetime.fromisoformat(str(fetched_at)) if fetched.tzinfo is None: fetched = fetched.replace(tzinfo=timezone.utc) age = (datetime.now(timezone.utc) - fetched).total_seconds() return age < ttl_seconds def _auth_headers(source: dict[str, Any]) -> dict[str, str]: auth_env = source.get("auth_env") if not auth_env: return {} token = os.environ.get(auth_env) if not token: raise ValueError( f"auth env {auth_env} is not set for remote source {source['repo']}" ) header_name = source.get("auth_header", "Authorization") if header_name.lower() == "authorization" and not token.lower().startswith( ("bearer ", "basic ") ): token = f"Bearer {token}" return {header_name: token} def fetch_remote_index_text(url: str, source: dict[str, Any]) -> str: headers = {"User-Agent": USER_AGENT, **_auth_headers(source)} request = urllib.request.Request(url, headers=headers) try: with urllib.request.urlopen(request, timeout=30) as response: return response.read().decode("utf-8") except urllib.error.HTTPError as exc: raise ConnectionError( f"HTTP {exc.code} fetching {url} for {source['repo']}" ) from exc except urllib.error.URLError as exc: raise ConnectionError( f"failed to fetch {url} for {source['repo']}: {exc.reason}" ) from exc def _write_remote_cache( repo: str, url: str, content: str, cache_dir: Path | None = None ) -> Path: root = _cache_root(cache_dir) root.mkdir(parents=True, exist_ok=True) index_path, meta_path = _cache_paths(repo, cache_dir) index_path.write_text(content, encoding="utf-8") meta = { "fetched_at": datetime.now(timezone.utc).isoformat(), "url": url, "repo": repo, } meta_path.write_text(yaml.safe_dump(meta, sort_keys=False), encoding="utf-8") return index_path def resolve_source_index_path( source: dict[str, Any], *, refresh: bool = False, cache_dir: Path | None = None, ) -> tuple[Path | None, list[str]]: warnings: list[str] = [] if "index" in source: path = _resolve_index_path(source["index"]) if not path.exists(): message = f"missing index for {source['repo']}: {path}" if source.get("required", False): raise FileNotFoundError(message) warnings.append(message) return None, warnings return path, warnings url = source["url"] ttl_seconds = int(source.get("cache_ttl_seconds", DEFAULT_CACHE_TTL_SECONDS)) index_path, meta_path = _cache_paths(source["repo"], cache_dir) meta = _read_cache_meta(meta_path) use_cache = ( index_path.exists() and meta is not None and meta.get("url") == url and _cache_is_fresh(meta, ttl_seconds) and not refresh ) if use_cache: return index_path, warnings try: content = fetch_remote_index_text(url, source) except (ConnectionError, ValueError) as exc: if index_path.exists(): warnings.append( f"remote fetch failed for {source['repo']}, using stale cache: {exc}" ) return index_path, warnings message = f"remote index unavailable for {source['repo']}: {exc}" if source.get("required", False): raise FileNotFoundError(message) from exc warnings.append(message) return None, warnings return _write_remote_cache(source["repo"], url, content, cache_dir), warnings def compose_federated_index( manifest: dict[str, Any] | None = None, *, refresh: bool = False, cache_dir: Path | None = None, ) -> tuple[dict[str, Any], list[str]]: manifest = manifest or load_federation_manifest() warnings: list[str] = [] merged: list[dict[str, Any]] = [] seen_ids: dict[str, str] = {} source_summaries: list[dict[str, Any]] = [] for source in manifest["sources"]: if not source.get("enabled", False): continue index_path, source_warnings = resolve_source_index_path( source, refresh=refresh, cache_dir=cache_dir ) warnings.extend(source_warnings) if index_path is None: continue with index_path.open(encoding="utf-8") as handle: index_data = yaml.safe_load(handle) count = 0 for item in index_data.get("capabilities", []): cap_id = item["id"] if cap_id in seen_ids: warnings.append( f"duplicate id {cap_id}: {seen_ids[cap_id]} and {source['repo']}" ) else: seen_ids[cap_id] = source["repo"] federated_item = dict(item) federated_item["source_repo"] = source["repo"] if "url" in source: federated_item["source_url"] = source["url"] federated_item["source_index"] = _path_label(index_path) else: federated_item["source_index"] = source["index"] merged.append(federated_item) count += 1 summary: dict[str, Any] = { "repo": source["repo"], "count": count, } if "url" in source: summary["url"] = source["url"] summary["cache"] = _path_label(index_path) else: summary["index"] = source["index"] source_summaries.append(summary) federated = { "version": manifest.get("version", 1), "updated": date.today().isoformat(), "domain": manifest.get("domain"), "collision_policy": manifest.get("collision_policy", "warn"), "sources": source_summaries, "capabilities": sorted(merged, key=lambda item: item["id"]), } return federated, warnings def write_federated_index( output_path: Path | None = None, manifest: dict[str, Any] | None = None, *, refresh: bool = False, ) -> tuple[Path, list[str]]: federated, warnings = compose_federated_index(manifest, refresh=refresh) target = output_path or FEDERATED_INDEX_PATH target.parent.mkdir(parents=True, exist_ok=True) header = ( "# Composed federated capability index. Regenerate with:\n" "# reuse-surface federation compose\n" ) target.write_text( header + yaml.safe_dump(federated, sort_keys=False), encoding="utf-8", ) return target, warnings