Files
reuse-surface/reuse_surface/federation.py
tegwick e8797b2e91
Some checks failed
ci / validate-registry (push) Has been cancelled
Complete WP-0010: HTTP remote federation with cache
Extend federation manifest schema for url sources with auth and TTL metadata.
Fetch remote capability indexes over HTTP(S), cache under
registry/federation/cache/, and fall back to stale cache on fetch failure.
Add --refresh flag, seven federation tests, and updated federation docs.
2026-06-15 02:28:44 +02:00

256 lines
8.4 KiB
Python

from __future__ import annotations
import os
import re
import urllib.error
import urllib.request
from datetime import date, datetime, timezone
from pathlib import Path
from typing import Any
import yaml
from jsonschema import Draft202012Validator
from reuse_surface.registry import ROOT
MANIFEST_PATH = ROOT / "registry" / "federation" / "sources.yaml"
SCHEMA_PATH = ROOT / "schemas" / "federation.schema.yaml"
FEDERATED_INDEX_PATH = ROOT / "registry" / "indexes" / "federated.yaml"
CACHE_DIR = ROOT / "registry" / "federation" / "cache"
DEFAULT_CACHE_TTL_SECONDS = 86400
USER_AGENT = "reuse-surface/0.1 federation-compose"
def _expand_path(index_path: str) -> Path:
return Path(index_path).expanduser()
def load_federation_manifest(path: Path | None = None) -> dict[str, Any]:
manifest_path = path or MANIFEST_PATH
with manifest_path.open(encoding="utf-8") as handle:
manifest = yaml.safe_load(handle)
schema = yaml.safe_load(SCHEMA_PATH.read_text(encoding="utf-8"))
validator = Draft202012Validator(schema)
errors = sorted(validator.iter_errors(manifest), key=lambda err: err.path)
if errors:
messages = "; ".join(error.message for error in errors)
raise ValueError(f"invalid federation manifest: {messages}")
return manifest
def _resolve_index_path(index_value: str) -> Path:
path = _expand_path(index_value)
if not path.is_absolute():
path = (ROOT / path).resolve()
return path
def _safe_repo_slug(repo: str) -> str:
return re.sub(r"[^\w.-]", "_", repo)
def _path_label(path: Path) -> str:
try:
return str(path.relative_to(ROOT))
except ValueError:
return str(path)
def _cache_paths(repo: str) -> tuple[Path, Path]:
slug = _safe_repo_slug(repo)
return CACHE_DIR / f"{slug}.yaml", CACHE_DIR / f"{slug}.meta.yaml"
def _read_cache_meta(meta_path: Path) -> dict[str, Any] | None:
if not meta_path.exists():
return None
data = yaml.safe_load(meta_path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else None
def _cache_is_fresh(meta: dict[str, Any], ttl_seconds: int) -> bool:
if ttl_seconds <= 0:
return False
fetched_at = meta.get("fetched_at")
if not fetched_at:
return False
fetched = datetime.fromisoformat(str(fetched_at))
if fetched.tzinfo is None:
fetched = fetched.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - fetched).total_seconds()
return age < ttl_seconds
def _auth_headers(source: dict[str, Any]) -> dict[str, str]:
auth_env = source.get("auth_env")
if not auth_env:
return {}
token = os.environ.get(auth_env)
if not token:
raise ValueError(
f"auth env {auth_env} is not set for remote source {source['repo']}"
)
header_name = source.get("auth_header", "Authorization")
if header_name.lower() == "authorization" and not token.lower().startswith(
("bearer ", "basic ")
):
token = f"Bearer {token}"
return {header_name: token}
def fetch_remote_index_text(url: str, source: dict[str, Any]) -> str:
headers = {"User-Agent": USER_AGENT, **_auth_headers(source)}
request = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(request, timeout=30) as response:
return response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
raise ConnectionError(
f"HTTP {exc.code} fetching {url} for {source['repo']}"
) from exc
except urllib.error.URLError as exc:
raise ConnectionError(
f"failed to fetch {url} for {source['repo']}: {exc.reason}"
) from exc
def _write_remote_cache(repo: str, url: str, content: str) -> Path:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
index_path, meta_path = _cache_paths(repo)
index_path.write_text(content, encoding="utf-8")
meta = {
"fetched_at": datetime.now(timezone.utc).isoformat(),
"url": url,
"repo": repo,
}
meta_path.write_text(yaml.safe_dump(meta, sort_keys=False), encoding="utf-8")
return index_path
def resolve_source_index_path(
source: dict[str, Any],
*,
refresh: bool = False,
) -> tuple[Path | None, list[str]]:
warnings: list[str] = []
if "index" in source:
path = _resolve_index_path(source["index"])
if not path.exists():
message = f"missing index for {source['repo']}: {path}"
if source.get("required", False):
raise FileNotFoundError(message)
warnings.append(message)
return None, warnings
return path, warnings
url = source["url"]
ttl_seconds = int(source.get("cache_ttl_seconds", DEFAULT_CACHE_TTL_SECONDS))
index_path, meta_path = _cache_paths(source["repo"])
meta = _read_cache_meta(meta_path)
use_cache = (
index_path.exists()
and meta is not None
and meta.get("url") == url
and _cache_is_fresh(meta, ttl_seconds)
and not refresh
)
if use_cache:
return index_path, warnings
try:
content = fetch_remote_index_text(url, source)
except (ConnectionError, ValueError) as exc:
if index_path.exists():
warnings.append(
f"remote fetch failed for {source['repo']}, using stale cache: {exc}"
)
return index_path, warnings
message = f"remote index unavailable for {source['repo']}: {exc}"
if source.get("required", False):
raise FileNotFoundError(message) from exc
warnings.append(message)
return None, warnings
return _write_remote_cache(source["repo"], url, content), warnings
def compose_federated_index(
manifest: dict[str, Any] | None = None,
*,
refresh: bool = False,
) -> tuple[dict[str, Any], list[str]]:
manifest = manifest or load_federation_manifest()
warnings: list[str] = []
merged: list[dict[str, Any]] = []
seen_ids: dict[str, str] = {}
source_summaries: list[dict[str, Any]] = []
for source in manifest["sources"]:
if not source.get("enabled", False):
continue
index_path, source_warnings = resolve_source_index_path(
source, refresh=refresh
)
warnings.extend(source_warnings)
if index_path is None:
continue
with index_path.open(encoding="utf-8") as handle:
index_data = yaml.safe_load(handle)
count = 0
for item in index_data.get("capabilities", []):
cap_id = item["id"]
if cap_id in seen_ids:
warnings.append(
f"duplicate id {cap_id}: {seen_ids[cap_id]} and {source['repo']}"
)
else:
seen_ids[cap_id] = source["repo"]
federated_item = dict(item)
federated_item["source_repo"] = source["repo"]
if "url" in source:
federated_item["source_url"] = source["url"]
federated_item["source_index"] = _path_label(index_path)
else:
federated_item["source_index"] = source["index"]
merged.append(federated_item)
count += 1
summary: dict[str, Any] = {
"repo": source["repo"],
"count": count,
}
if "url" in source:
summary["url"] = source["url"]
summary["cache"] = _path_label(index_path)
else:
summary["index"] = source["index"]
source_summaries.append(summary)
federated = {
"version": manifest.get("version", 1),
"updated": date.today().isoformat(),
"domain": manifest.get("domain"),
"collision_policy": manifest.get("collision_policy", "warn"),
"sources": source_summaries,
"capabilities": sorted(merged, key=lambda item: item["id"]),
}
return federated, warnings
def write_federated_index(
output_path: Path | None = None,
manifest: dict[str, Any] | None = None,
*,
refresh: bool = False,
) -> tuple[Path, list[str]]:
federated, warnings = compose_federated_index(manifest, refresh=refresh)
target = output_path or FEDERATED_INDEX_PATH
target.parent.mkdir(parents=True, exist_ok=True)
header = (
"# Composed federated capability index. Regenerate with:\n"
"# reuse-surface federation compose\n"
)
target.write_text(
header + yaml.safe_dump(federated, sort_keys=False),
encoding="utf-8",
)
return target, warnings