"""Catalog loader — walks a catalog directory tree and parses YAML files.""" from __future__ import annotations import logging from pathlib import Path from typing import Any import yaml from bridge.catalog.models import ( ActorClass, Catalog, CatalogBridge, CatalogDomain, CatalogTarget, ) from bridge.models import HealthCheckConfig, ReconnectPolicy log = logging.getLogger(__name__) class CatalogLoadError(Exception): """Raised when catalog loading fails.""" def load_catalog(path: Path) -> Catalog: """Walk the catalog directory and return a populated Catalog.""" path = Path(path) if not path.exists(): raise CatalogLoadError(f"Catalog path not found: {path}") catalog = Catalog() for yaml_file in sorted(path.rglob("*.yaml")): _load_file(yaml_file, catalog) return catalog def _load_file(path: Path, catalog: Catalog) -> None: try: with path.open() as f: data = yaml.safe_load(f) except yaml.YAMLError as e: raise CatalogLoadError(f"Invalid YAML in {path}: {e}") from e if not isinstance(data, dict): log.warning("Skipping %s: not a YAML mapping", path) return entry_type = data.get("type") if not entry_type: log.warning("Skipping %s: no 'type' field", path) return try: if entry_type == "domain": entry = _parse_domain(data, path) catalog.domains[entry.id] = entry elif entry_type == "target": entry = _parse_target(data, path) catalog.targets[entry.id] = entry elif entry_type == "bridge": entry = _parse_bridge(data, path) catalog.bridges[entry.id] = entry elif entry_type == "actor": entry = _parse_actor(data, path) catalog.actors[entry.id] = entry else: log.warning("Skipping %s: unknown type '%s'", path, entry_type) except CatalogLoadError: raise except Exception as e: raise CatalogLoadError(f"Error parsing {path}: {e}") from e def _require(data: dict, field: str, path: Path) -> Any: if field not in data: raise CatalogLoadError(f"Missing required field '{field}' in {path}") return data[field] def _parse_domain(data: dict, path: Path) -> CatalogDomain: return CatalogDomain( id=str(_require(data, "id", path)), name=str(_require(data, "name", path)), description=str(data.get("description", "")), environment=str(data.get("environment", "")), ) def _parse_target(data: dict, path: Path) -> CatalogTarget: return CatalogTarget( id=str(_require(data, "id", path)), domain=str(_require(data, "domain", path)), kind=str(_require(data, "kind", path)), description=str(data.get("description", "")), reachable_via=list(data.get("reachable_via") or []), ) def _parse_bridge(data: dict, path: Path) -> CatalogBridge: health_check = None if "health_check" in data and data["health_check"]: hc = data["health_check"] health_check = HealthCheckConfig( url=str(_require(hc, "url", path)), interval_seconds=int(hc.get("interval_seconds", 30)), timeout_seconds=int(hc.get("timeout_seconds", 5)), ) reconnect = None if "reconnect" in data and data["reconnect"]: r = data["reconnect"] reconnect = ReconnectPolicy( max_attempts=int(r.get("max_attempts", 0)), backoff_initial=int(r.get("backoff_initial", 5)), backoff_max=int(r.get("backoff_max", 60)), ) return CatalogBridge( id=str(_require(data, "id", path)), domain=str(_require(data, "domain", path)), target=str(_require(data, "target", path)), host=str(_require(data, "host", path)), remote_port=int(_require(data, "remote_port", path)), local_port=int(_require(data, "local_port", path)), ssh_user=str(_require(data, "ssh_user", path)), ssh_key=str(_require(data, "ssh_key", path)), actor=str(_require(data, "actor", path)), description=str(data.get("description", "")), access_method=str(data.get("access_method", "ssh-reverse")), health_check=health_check, reconnect=reconnect, ) def _parse_actor(data: dict, path: Path) -> ActorClass: return ActorClass( id=str(_require(data, "id", path)), actor_class=str(_require(data, "class", path)), description=str(data.get("description", "")), )