diff --git a/src/bridge/catalog/__init__.py b/src/bridge/catalog/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/bridge/catalog/loader.py b/src/bridge/catalog/loader.py new file mode 100644 index 0000000..5450111 --- /dev/null +++ b/src/bridge/catalog/loader.py @@ -0,0 +1,142 @@ +"""Catalog loader — walks a catalog directory tree and parses YAML files.""" +from __future__ import annotations + +import logging +import warnings +from pathlib import Path +from typing import Any, Dict + +import yaml + +from bridge.catalog.models import ( + ActorClass, + Catalog, + CatalogBridge, + CatalogDomain, + CatalogTarget, +) +from bridge.models import HealthCheckConfig, ReconnectPolicy + +log = logging.getLogger(__name__) + + +class CatalogLoadError(Exception): + """Raised when catalog loading fails.""" + + +def load_catalog(path: Path) -> Catalog: + """Walk the catalog directory and return a populated Catalog.""" + path = Path(path) + if not path.exists(): + raise CatalogLoadError(f"Catalog path not found: {path}") + + catalog = Catalog() + for yaml_file in sorted(path.rglob("*.yaml")): + _load_file(yaml_file, catalog) + return catalog + + +def _load_file(path: Path, catalog: Catalog) -> None: + try: + with path.open() as f: + data = yaml.safe_load(f) + except yaml.YAMLError as e: + raise CatalogLoadError(f"Invalid YAML in {path}: {e}") from e + + if not isinstance(data, dict): + log.warning("Skipping %s: not a YAML mapping", path) + return + + entry_type = data.get("type") + if not entry_type: + log.warning("Skipping %s: no 'type' field", path) + return + + try: + if entry_type == "domain": + entry = _parse_domain(data, path) + catalog.domains[entry.id] = entry + elif entry_type == "target": + entry = _parse_target(data, path) + catalog.targets[entry.id] = entry + elif entry_type == "bridge": + entry = _parse_bridge(data, path) + catalog.bridges[entry.id] = entry + elif entry_type == "actor": + entry = _parse_actor(data, path) + catalog.actors[entry.id] = entry + else: + log.warning("Skipping %s: unknown type '%s'", path, entry_type) + except CatalogLoadError: + raise + except Exception as e: + raise CatalogLoadError(f"Error parsing {path}: {e}") from e + + +def _require(data: dict, field: str, path: Path) -> Any: + if field not in data: + raise CatalogLoadError(f"Missing required field '{field}' in {path}") + return data[field] + + +def _parse_domain(data: dict, path: Path) -> CatalogDomain: + return CatalogDomain( + id=str(_require(data, "id", path)), + name=str(_require(data, "name", path)), + description=str(data.get("description", "")), + environment=str(data.get("environment", "")), + ) + + +def _parse_target(data: dict, path: Path) -> CatalogTarget: + return CatalogTarget( + id=str(_require(data, "id", path)), + domain=str(_require(data, "domain", path)), + kind=str(_require(data, "kind", path)), + description=str(data.get("description", "")), + reachable_via=list(data.get("reachable_via") or []), + ) + + +def _parse_bridge(data: dict, path: Path) -> CatalogBridge: + health_check = None + if "health_check" in data and data["health_check"]: + hc = data["health_check"] + health_check = HealthCheckConfig( + url=str(_require(hc, "url", path)), + interval_seconds=int(hc.get("interval_seconds", 30)), + timeout_seconds=int(hc.get("timeout_seconds", 5)), + ) + + reconnect = None + if "reconnect" in data and data["reconnect"]: + r = data["reconnect"] + reconnect = ReconnectPolicy( + max_attempts=int(r.get("max_attempts", 0)), + backoff_initial=int(r.get("backoff_initial", 5)), + backoff_max=int(r.get("backoff_max", 60)), + ) + + return CatalogBridge( + id=str(_require(data, "id", path)), + domain=str(_require(data, "domain", path)), + target=str(_require(data, "target", path)), + host=str(_require(data, "host", path)), + remote_port=int(_require(data, "remote_port", path)), + local_port=int(_require(data, "local_port", path)), + ssh_user=str(_require(data, "ssh_user", path)), + ssh_key=str(_require(data, "ssh_key", path)), + actor=str(_require(data, "actor", path)), + description=str(data.get("description", "")), + access_method=str(data.get("access_method", "ssh-reverse")), + health_check=health_check, + reconnect=reconnect, + ) + + +def _parse_actor(data: dict, path: Path) -> ActorClass: + return ActorClass( + id=str(_require(data, "id", path)), + actor_class=str(_require(data, "class", path)), + description=str(data.get("description", "")), + ) diff --git a/src/bridge/catalog/models.py b/src/bridge/catalog/models.py new file mode 100644 index 0000000..45698da --- /dev/null +++ b/src/bridge/catalog/models.py @@ -0,0 +1,69 @@ +"""Domain models for OpsCatalog.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from bridge.models import HealthCheckConfig, ReconnectPolicy, TunnelConfig + + +@dataclass +class CatalogDomain: + id: str + name: str + description: str = "" + environment: str = "" + + +@dataclass +class CatalogTarget: + id: str + domain: str + kind: str + description: str = "" + reachable_via: List[str] = field(default_factory=list) + + +@dataclass +class CatalogBridge: + id: str + domain: str + target: str + host: str + remote_port: int + local_port: int + ssh_user: str + ssh_key: str + actor: str + description: str = "" + access_method: str = "ssh-reverse" + health_check: Optional[HealthCheckConfig] = None + reconnect: Optional[ReconnectPolicy] = None + + def to_tunnel_config(self) -> TunnelConfig: + return TunnelConfig( + name=self.id, + host=self.host, + remote_port=self.remote_port, + local_port=self.local_port, + ssh_user=self.ssh_user, + ssh_key=self.ssh_key, + actor=self.actor, + reconnect=self.reconnect if self.reconnect is not None else ReconnectPolicy(), + health_check=self.health_check, + ) + + +@dataclass +class ActorClass: + id: str + actor_class: str + description: str = "" + + +@dataclass +class Catalog: + domains: Dict[str, CatalogDomain] = field(default_factory=dict) + targets: Dict[str, CatalogTarget] = field(default_factory=dict) + bridges: Dict[str, CatalogBridge] = field(default_factory=dict) + actors: Dict[str, ActorClass] = field(default_factory=dict) diff --git a/src/bridge/catalog/resolver.py b/src/bridge/catalog/resolver.py new file mode 100644 index 0000000..8d829f4 --- /dev/null +++ b/src/bridge/catalog/resolver.py @@ -0,0 +1,35 @@ +"""Catalog resolver — resolves a bridge name to a TunnelConfig.""" +from __future__ import annotations + +from typing import Dict, Optional + +from bridge.catalog.models import Catalog +from bridge.models import TunnelConfig + + +class BridgeNotFound(Exception): + """Raised when a bridge name cannot be resolved from inline config or catalog.""" + + +def resolve( + name: str, + catalog: Optional[Catalog], + inline_tunnels: Dict[str, TunnelConfig], +) -> TunnelConfig: + """Resolve bridge name to TunnelConfig. + + Lookup order: + 1. inline_tunnels (from tunnels.yaml) — wins if present + 2. catalog bridges — fallback + 3. raises BridgeNotFound if neither has the name + """ + if name in inline_tunnels: + return inline_tunnels[name] + + if catalog is not None and name in catalog.bridges: + return catalog.bridges[name].to_tunnel_config() + + raise BridgeNotFound( + f"Bridge '{name}' not found in inline config" + + (" or catalog" if catalog is not None else " (no catalog configured)") + ) diff --git a/src/bridge/catalog/validator.py b/src/bridge/catalog/validator.py new file mode 100644 index 0000000..0541e3b --- /dev/null +++ b/src/bridge/catalog/validator.py @@ -0,0 +1,42 @@ +"""Catalog validator — cross-reference checks for catalog consistency.""" +from __future__ import annotations + +from typing import List + +from bridge.catalog.models import Catalog + + +class ValidationError(Exception): + """Raised when catalog validation fails (used for programmatic access).""" + + +def validate_catalog(catalog: Catalog) -> List[str]: + """Return a list of validation error strings (empty = valid).""" + errors: List[str] = [] + + for target in catalog.targets.values(): + if target.domain not in catalog.domains: + errors.append( + f"Target '{target.id}': domain '{target.domain}' does not exist in catalog" + ) + for bridge_id in target.reachable_via: + if bridge_id not in catalog.bridges: + errors.append( + f"Target '{target.id}': reachable_via references unknown bridge '{bridge_id}'" + ) + + for bridge in catalog.bridges.values(): + if bridge.domain not in catalog.domains: + errors.append( + f"Bridge '{bridge.id}': domain '{bridge.domain}' does not exist in catalog" + ) + if bridge.target not in catalog.targets: + errors.append( + f"Bridge '{bridge.id}': target '{bridge.target}' does not exist in catalog" + ) + if bridge.actor not in catalog.actors: + errors.append( + f"Bridge '{bridge.id}': actor '{bridge.actor}' does not exist in catalog" + ) + + return errors diff --git a/src/bridge/cli.py b/src/bridge/cli.py index 9654296..88f0201 100644 --- a/src/bridge/cli.py +++ b/src/bridge/cli.py @@ -3,7 +3,6 @@ from __future__ import annotations import json import os -import sys from pathlib import Path from typing import Optional @@ -21,6 +20,12 @@ app = typer.Typer( no_args_is_help=True, ) +targets_app = typer.Typer(help="Inspect infrastructure targets from the OpsCatalog.") +catalog_app = typer.Typer(help="Inspect and validate the OpsCatalog.") + +app.add_typer(targets_app, name="targets") +app.add_typer(catalog_app, name="catalog") + def _state_dir() -> Path: return Path(os.environ.get("BRIDGE_STATE_DIR", str(Path.home() / ".local" / "state" / "bridge"))) @@ -34,85 +39,129 @@ def _load_or_exit(): raise typer.Exit(1) -def _require_tunnel(cfg, name: str): - if name not in cfg.tunnels: - typer.echo(f"Error: tunnel '{name}' not found in config", err=True) +def _load_catalog_or_exit(cfg): + from bridge.catalog.loader import CatalogLoadError, load_catalog + if cfg.catalog_path is None: + typer.echo("Error: catalog_path not configured in tunnels.yaml", err=True) + raise typer.Exit(1) + try: + return load_catalog(cfg.catalog_path) + except Exception as e: + typer.echo(f"Error loading catalog: {e}", err=True) raise typer.Exit(1) - return cfg.tunnels[name] +def _resolve_tunnel(cfg, name: str): + """Resolve tunnel name: inline first, then catalog, then error.""" + from bridge.catalog.loader import load_catalog + from bridge.catalog.resolver import BridgeNotFound, resolve + + catalog = None + if cfg.catalog_path is not None: + try: + catalog = load_catalog(cfg.catalog_path) + except Exception: + pass + + try: + return resolve(name, catalog=catalog, inline_tunnels=cfg.tunnels) + except BridgeNotFound: + typer.echo(f"Error: tunnel '{name}' not found in config or catalog", err=True) + raise typer.Exit(1) + + +def _all_tunnel_names(cfg): + """Return names from inline config (all-tunnels operations use inline only).""" + return list(cfg.tunnels.keys()) + + +# ─── Tunnel lifecycle commands ──────────────────────────────────────────────── + @app.command() def up( - tunnel: Optional[str] = typer.Argument(None, help="Tunnel name (omit for all)"), + tunnel: Optional[str] = typer.Argument(None, help="Tunnel name (omit for all inline)"), ): """Start one or all tunnels.""" cfg = _load_or_exit() sd = _state_dir() - names = [tunnel] if tunnel else list(cfg.tunnels.keys()) if tunnel: - _require_tunnel(cfg, tunnel) - - any_already_running = False - for name in names: - tcfg = cfg.tunnels[name] + tcfg = _resolve_tunnel(cfg, tunnel) mgr = TunnelManager(tcfg, state_dir=sd) if mgr.is_running(): - typer.echo(f"Tunnel '{name}' is already running.") - any_already_running = True - else: - mgr.start() - typer.echo(f"Started tunnel '{name}'.") - - if any_already_running and len(names) == 1: - raise typer.Exit(2) + typer.echo(f"Tunnel '{tunnel}' is already running.") + raise typer.Exit(2) + mgr.start() + typer.echo(f"Started tunnel '{tunnel}'.") + else: + names = _all_tunnel_names(cfg) + any_already_running = False + for name in names: + tcfg = cfg.tunnels[name] + mgr = TunnelManager(tcfg, state_dir=sd) + if mgr.is_running(): + typer.echo(f"Tunnel '{name}' is already running.") + any_already_running = True + else: + mgr.start() + typer.echo(f"Started tunnel '{name}'.") + if any_already_running and len(names) == 1: + raise typer.Exit(2) @app.command() def down( - tunnel: Optional[str] = typer.Argument(None, help="Tunnel name (omit for all)"), + tunnel: Optional[str] = typer.Argument(None, help="Tunnel name (omit for all inline)"), ): """Stop one or all tunnels.""" cfg = _load_or_exit() sd = _state_dir() - names = [tunnel] if tunnel else list(cfg.tunnels.keys()) if tunnel: - _require_tunnel(cfg, tunnel) - - any_not_running = False - for name in names: - tcfg = cfg.tunnels[name] + tcfg = _resolve_tunnel(cfg, tunnel) mgr = TunnelManager(tcfg, state_dir=sd) if not mgr.is_running(): - typer.echo(f"Tunnel '{name}' is not running.") - any_not_running = True - else: - mgr.stop() - typer.echo(f"Stopped tunnel '{name}'.") - - if any_not_running and len(names) == 1: - raise typer.Exit(2) + typer.echo(f"Tunnel '{tunnel}' is not running.") + raise typer.Exit(2) + mgr.stop() + typer.echo(f"Stopped tunnel '{tunnel}'.") + else: + names = _all_tunnel_names(cfg) + any_not_running = False + for name in names: + tcfg = cfg.tunnels[name] + mgr = TunnelManager(tcfg, state_dir=sd) + if not mgr.is_running(): + typer.echo(f"Tunnel '{name}' is not running.") + any_not_running = True + else: + mgr.stop() + typer.echo(f"Stopped tunnel '{name}'.") + if any_not_running and len(names) == 1: + raise typer.Exit(2) @app.command() def restart( - tunnel: Optional[str] = typer.Argument(None, help="Tunnel name (omit for all)"), + tunnel: Optional[str] = typer.Argument(None, help="Tunnel name (omit for all inline)"), ): """Restart one or all tunnels.""" cfg = _load_or_exit() sd = _state_dir() - names = [tunnel] if tunnel else list(cfg.tunnels.keys()) if tunnel: - _require_tunnel(cfg, tunnel) - - for name in names: - tcfg = cfg.tunnels[name] + tcfg = _resolve_tunnel(cfg, tunnel) mgr = TunnelManager(tcfg, state_dir=sd) mgr.stop() mgr.start() - typer.echo(f"Restarted tunnel '{name}'.") + typer.echo(f"Restarted tunnel '{tunnel}'.") + else: + for name in _all_tunnel_names(cfg): + tcfg = cfg.tunnels[name] + mgr = TunnelManager(tcfg, state_dir=sd) + mgr.stop() + mgr.start() + typer.echo(f"Restarted tunnel '{name}'.") @app.command() @@ -134,8 +183,8 @@ def status( "actor": tcfg.actor, "host": tcfg.host, "pid": pid, - "uptime": None, # future: track start time - "health": None, # future: last health check result + "uptime": None, + "health": None, }) if as_json: @@ -145,8 +194,14 @@ def status( def _print_status_table(rows): + if not rows: + typer.echo("No tunnels configured.") + return headers = ["TUNNEL", "STATE", "ACTOR", "HOST", "PID"] - col_widths = [max(len(h), max((len(str(r.get(h.lower(), "") or "")) for r in rows), default=0)) for h in headers] + col_widths = [ + max(len(h), max((len(str(r.get(h.lower(), "") or "")) for r in rows), default=0)) + for h in headers + ] def _fmt_row(vals): return " ".join(str(v).ljust(w) for v, w in zip(vals, col_widths)) @@ -171,7 +226,7 @@ def logs( ): """Show audit log for a tunnel.""" cfg = _load_or_exit() - _require_tunnel(cfg, tunnel) + _resolve_tunnel(cfg, tunnel) # validate name sd = _state_dir() logger = AuditLogger(state_dir=sd) @@ -181,7 +236,6 @@ def logs( typer.echo(f"No log entries for tunnel '{tunnel}'.") return - # Show last N lines for entry in events[-lines:]: ts = entry.get("timestamp", "") event = entry.get("event", "") @@ -197,7 +251,7 @@ def logs( log_path = sd / f"{tunnel}.log" try: with log_path.open() as f: - f.seek(0, 2) # seek to end + f.seek(0, 2) while True: line = f.readline() if line: @@ -217,3 +271,201 @@ def logs( time.sleep(0.5) except KeyboardInterrupt: pass + + +# ─── targets commands ───────────────────────────────────────────────────────── + +@targets_app.callback(invoke_without_command=True) +def targets_default( + ctx: typer.Context, + domain: Optional[str] = typer.Option(None, "--domain", help="Filter by domain"), + as_json: bool = typer.Option(False, "--json", help="Output as JSON"), +): + """List infrastructure targets from the OpsCatalog.""" + if ctx.invoked_subcommand is not None: + return + cfg = _load_or_exit() + cat = _load_catalog_or_exit(cfg) + + rows = [] + for t in cat.targets.values(): + if domain and t.domain != domain: + continue + rows.append({ + "domain": t.domain, + "target": t.id, + "kind": t.kind, + "description": t.description, + "bridges": t.reachable_via, + }) + + if as_json: + typer.echo(json.dumps(rows, indent=2)) + else: + if not rows: + typer.echo("No targets found.") + return + headers = ["DOMAIN", "TARGET", "KIND", "BRIDGES"] + col_widths = [ + max(len(h), max((len(str(r.get(h.lower(), "") or "")) for r in rows), default=0)) + for h in headers + ] + + def _fmt(vals): + return " ".join(str(v).ljust(w) for v, w in zip(vals, col_widths)) + + typer.echo(_fmt(headers)) + typer.echo(_fmt(["-" * w for w in col_widths])) + for row in rows: + typer.echo(_fmt([ + row["domain"], + row["target"], + row["kind"], + ", ".join(row["bridges"]), + ])) + + +@targets_app.command("show") +def targets_show( + target: str = typer.Argument(..., help="Target ID"), +): + """Show full metadata for a target.""" + cfg = _load_or_exit() + cat = _load_catalog_or_exit(cfg) + + if target not in cat.targets: + typer.echo(f"Error: target '{target}' not found in catalog", err=True) + raise typer.Exit(1) + + t = cat.targets[target] + typer.echo(f"Target: {t.id}") + typer.echo(f"Domain: {t.domain}") + typer.echo(f"Kind: {t.kind}") + if t.description: + typer.echo(f"Description: {t.description}") + if t.reachable_via: + typer.echo(f"Bridges: {', '.join(t.reachable_via)}") + + # Show ops notes from docs/ if available + if cfg.catalog_path: + docs_dir = cfg.catalog_path / "domains" / t.domain / "docs" + if docs_dir.exists(): + for md_file in sorted(docs_dir.glob("*.md")): + typer.echo(f"\n--- {md_file.name} ---") + typer.echo(md_file.read_text()) + + +# ─── catalog commands ───────────────────────────────────────────────────────── + +@catalog_app.callback(invoke_without_command=True) +def catalog_default(ctx: typer.Context): + """Inspect and validate the OpsCatalog.""" + if ctx.invoked_subcommand is None: + typer.echo(ctx.get_help()) + + +@catalog_app.command("list") +def catalog_list( + as_json: bool = typer.Option(False, "--json", help="Output as JSON"), +): + """List all domains with target and bridge counts.""" + cfg = _load_or_exit() + cat = _load_catalog_or_exit(cfg) + + rows = [] + for domain in cat.domains.values(): + target_count = sum(1 for t in cat.targets.values() if t.domain == domain.id) + bridge_count = sum(1 for b in cat.bridges.values() if b.domain == domain.id) + rows.append({ + "domain": domain.id, + "name": domain.name, + "environment": domain.environment, + "targets": target_count, + "bridges": bridge_count, + }) + + if as_json: + typer.echo(json.dumps(rows, indent=2)) + else: + if not rows: + typer.echo("Catalog is empty.") + return + headers = ["DOMAIN", "NAME", "ENV", "TARGETS", "BRIDGES"] + col_widths = [ + max(len(h), max((len(str(r.get(h.lower()[:3] if h == "ENV" else h.lower(), "") or "")) for r in rows), default=0)) + for h in headers + ] + # Manual col widths for cleaner output + col_widths = [ + max(len("DOMAIN"), max((len(r["domain"]) for r in rows), default=0)), + max(len("NAME"), max((len(r["name"]) for r in rows), default=0)), + max(len("ENV"), max((len(r["environment"]) for r in rows), default=0)), + max(len("TARGETS"), max((len(str(r["targets"])) for r in rows), default=0)), + max(len("BRIDGES"), max((len(str(r["bridges"])) for r in rows), default=0)), + ] + + def _fmt(vals): + return " ".join(str(v).ljust(w) for v, w in zip(vals, col_widths)) + + typer.echo(_fmt(headers)) + typer.echo(_fmt(["-" * w for w in col_widths])) + for row in rows: + typer.echo(_fmt([ + row["domain"], row["name"], row["environment"], + str(row["targets"]), str(row["bridges"]), + ])) + + +@catalog_app.command("validate") +def catalog_validate(): + """Validate catalog for consistency errors.""" + from bridge.catalog.validator import validate_catalog + + cfg = _load_or_exit() + cat = _load_catalog_or_exit(cfg) + + errors = validate_catalog(cat) + if errors: + typer.echo(f"Catalog has {len(errors)} violation(s):") + for err in errors: + typer.echo(f" - {err}") + raise typer.Exit(1) + else: + typer.echo(f"Catalog OK — {len(cat.domains)} domain(s), {len(cat.targets)} target(s), {len(cat.bridges)} bridge(s).") + + +@catalog_app.command("show") +def catalog_show( + bridge_id: str = typer.Argument(..., help="Bridge ID"), +): + """Show full metadata for a bridge.""" + cfg = _load_or_exit() + cat = _load_catalog_or_exit(cfg) + + if bridge_id not in cat.bridges: + typer.echo(f"Error: bridge '{bridge_id}' not found in catalog", err=True) + raise typer.Exit(1) + + b = cat.bridges[bridge_id] + typer.echo(f"Bridge: {b.id}") + typer.echo(f"Domain: {b.domain}") + typer.echo(f"Target: {b.target}") + typer.echo(f"Host: {b.host}") + typer.echo(f"Ports: {b.remote_port} -> {b.local_port}") + typer.echo(f"SSH user: {b.ssh_user}") + typer.echo(f"Actor: {b.actor}") + typer.echo(f"Method: {b.access_method}") + if b.description: + typer.echo(f"Description: {b.description}") + if b.health_check: + typer.echo(f"Health: {b.health_check.url} (every {b.health_check.interval_seconds}s)") + + # Domain context + if b.domain in cat.domains: + d = cat.domains[b.domain] + typer.echo(f"\nDomain context: {d.name} [{d.environment}]") + + # Target context + if b.target in cat.targets: + t = cat.targets[b.target] + typer.echo(f"Target: {t.description or t.id} ({t.kind})") diff --git a/src/bridge/config.py b/src/bridge/config.py index 9294815..1000be3 100644 --- a/src/bridge/config.py +++ b/src/bridge/config.py @@ -4,7 +4,7 @@ from __future__ import annotations import os from dataclasses import dataclass from pathlib import Path -from typing import Dict +from typing import Dict, Optional import yaml @@ -19,6 +19,7 @@ class ConfigError(Exception): class BridgeConfig: tunnels: Dict[str, TunnelConfig] actors: Dict[str, ActorInfo] + catalog_path: Optional[Path] = None def _default_config_path() -> Path: @@ -43,7 +44,12 @@ def load_config() -> BridgeConfig: tunnels = _parse_tunnels(raw.get("tunnels") or {}) actors = _parse_actors(raw.get("actors") or {}) - return BridgeConfig(tunnels=tunnels, actors=actors) + + catalog_path = None + if "catalog_path" in raw and raw["catalog_path"]: + catalog_path = Path(os.path.expanduser(str(raw["catalog_path"]))) + + return BridgeConfig(tunnels=tunnels, actors=actors, catalog_path=catalog_path) def _parse_tunnels(raw: dict) -> Dict[str, TunnelConfig]: diff --git a/tests/test_catalog_cli.py b/tests/test_catalog_cli.py new file mode 100644 index 0000000..cd648df --- /dev/null +++ b/tests/test_catalog_cli.py @@ -0,0 +1,203 @@ +"""Tests for catalog CLI commands (targets, catalog list/validate/show).""" +import json +import textwrap +from pathlib import Path + +import pytest +from typer.testing import CliRunner + +from bridge.cli import app + +runner = CliRunner() + +# Config with catalog_path pointing to a fixture +BASE_CONFIG = textwrap.dedent("""\ + tunnels: {{}} + actors: {{}} + catalog_path: {catalog_path} +""") + +CONFIG_NO_CATALOG = textwrap.dedent("""\ + tunnels: {} + actors: {} +""") + + +@pytest.fixture +def catalog_dir(tmp_path): + root = tmp_path / "opscatalog" + domain_dir = root / "domains" / "coulombcore" + (domain_dir / "targets").mkdir(parents=True) + (domain_dir / "bridges").mkdir(parents=True) + actors_dir = root / "actors" + actors_dir.mkdir(parents=True) + + (domain_dir / "domain.yaml").write_text(textwrap.dedent("""\ + type: domain + id: coulombcore + name: CoulombCore Infrastructure + description: Core infra + environment: production + """)) + + (domain_dir / "targets" / "state-hub.yaml").write_text(textwrap.dedent("""\ + type: target + id: state-hub + domain: coulombcore + kind: service + description: State coordination service + reachable_via: + - state-hub-coulombcore + """)) + + (domain_dir / "bridges" / "state-hub-coulombcore.yaml").write_text(textwrap.dedent("""\ + type: bridge + id: state-hub-coulombcore + domain: coulombcore + target: state-hub + description: Ops bridge for state hub + access_method: ssh-reverse + host: coulombcore.local + remote_port: 18000 + local_port: 8000 + ssh_user: ubuntu + ssh_key: ~/.ssh/id_ops + actor: agent.claude-coulombcore + """)) + + (actors_dir / "agents.yaml").write_text(textwrap.dedent("""\ + type: actor + id: agent.claude-coulombcore + class: automation + description: Claude Code agent + """)) + return root + + +@pytest.fixture +def config_file(tmp_path, catalog_dir): + f = tmp_path / "tunnels.yaml" + f.write_text(BASE_CONFIG.format(catalog_path=str(catalog_dir))) + return f + + +@pytest.fixture +def env(config_file, tmp_path): + return { + "BRIDGE_CONFIG": str(config_file), + "BRIDGE_STATE_DIR": str(tmp_path / "state"), + } + + +class TestTargetsCommand: + def test_targets_shows_table(self, env): + result = runner.invoke(app, ["targets"], env=env) + assert result.exit_code == 0 + assert "state-hub" in result.output + + def test_targets_json(self, env): + result = runner.invoke(app, ["targets", "--json"], env=env) + assert result.exit_code == 0 + data = json.loads(result.output) + assert isinstance(data, list) + assert any(t["target"] == "state-hub" for t in data) + assert any(t["domain"] == "coulombcore" for t in data) + + def test_targets_domain_filter(self, env): + result = runner.invoke(app, ["targets", "--domain", "coulombcore"], env=env) + assert result.exit_code == 0 + assert "state-hub" in result.output + + def test_targets_domain_filter_unknown(self, env): + result = runner.invoke(app, ["targets", "--domain", "nonexistent"], env=env) + assert result.exit_code == 0 + # No results but no crash + + def test_targets_no_catalog_configured(self, tmp_path): + f = tmp_path / "tunnels.yaml" + f.write_text(CONFIG_NO_CATALOG) + result = runner.invoke(app, ["targets"], env={"BRIDGE_CONFIG": str(f)}) + assert result.exit_code == 1 + assert "catalog" in result.output.lower() + + def test_targets_show_subcommand(self, env): + result = runner.invoke(app, ["targets", "show", "state-hub"], env=env) + assert result.exit_code == 0 + assert "state-hub" in result.output + assert "coulombcore" in result.output + + def test_targets_show_unknown(self, env): + result = runner.invoke(app, ["targets", "show", "nonexistent"], env=env) + assert result.exit_code == 1 + + +class TestCatalogCommand: + def test_catalog_list(self, env): + result = runner.invoke(app, ["catalog", "list"], env=env) + assert result.exit_code == 0 + assert "coulombcore" in result.output + + def test_catalog_list_json(self, env): + result = runner.invoke(app, ["catalog", "list", "--json"], env=env) + assert result.exit_code == 0 + data = json.loads(result.output) + assert isinstance(data, list) + assert any(d["domain"] == "coulombcore" for d in data) + + def test_catalog_validate_clean(self, env): + result = runner.invoke(app, ["catalog", "validate"], env=env) + assert result.exit_code == 0 + assert "valid" in result.output.lower() or "ok" in result.output.lower() or "0" in result.output + + def test_catalog_validate_with_errors(self, tmp_path): + # Catalog with dangling reference + root = tmp_path / "bad-catalog" + domain_dir = root / "domains" / "d" + (domain_dir / "targets").mkdir(parents=True) + (domain_dir / "domain.yaml").write_text( + "type: domain\nid: d\nname: D\n" + ) + (domain_dir / "targets" / "t.yaml").write_text( + "type: target\nid: t\ndomain: d\nkind: service\nreachable_via:\n - missing-bridge\n" + ) + f = tmp_path / "tunnels.yaml" + f.write_text(BASE_CONFIG.format(catalog_path=str(root))) + result = runner.invoke(app, ["catalog", "validate"], env={"BRIDGE_CONFIG": str(f)}) + assert result.exit_code == 1 + assert "missing-bridge" in result.output + + def test_catalog_show(self, env): + result = runner.invoke(app, ["catalog", "show", "state-hub-coulombcore"], env=env) + assert result.exit_code == 0 + assert "state-hub-coulombcore" in result.output + assert "coulombcore.local" in result.output + + def test_catalog_show_unknown(self, env): + result = runner.invoke(app, ["catalog", "show", "nonexistent"], env=env) + assert result.exit_code == 1 + + def test_catalog_no_catalog_configured(self, tmp_path): + f = tmp_path / "tunnels.yaml" + f.write_text(CONFIG_NO_CATALOG) + result = runner.invoke(app, ["catalog", "list"], env={"BRIDGE_CONFIG": str(f)}) + assert result.exit_code == 1 + + +class TestUpWithCatalogFallback: + def test_up_resolves_catalog_bridge(self, env): + """bridge up works when name not in inline tunnels.yaml.""" + from unittest.mock import MagicMock, patch + + with patch("bridge.cli.TunnelManager") as mock_mgr_cls: + mock_mgr = MagicMock() + mock_mgr.is_running.return_value = False + mock_mgr_cls.return_value = mock_mgr + + result = runner.invoke(app, ["up", "state-hub-coulombcore"], env=env) + + assert result.exit_code == 0 + mock_mgr.start.assert_called_once() + + def test_up_unknown_bridge_exit_1(self, env): + result = runner.invoke(app, ["up", "totally-nonexistent"], env=env) + assert result.exit_code == 1 diff --git a/tests/test_catalog_integration.py b/tests/test_catalog_integration.py new file mode 100644 index 0000000..c2d8480 --- /dev/null +++ b/tests/test_catalog_integration.py @@ -0,0 +1,196 @@ +"""Integration tests for OpsCatalog (T14-T16 from BRIDGE-WP-0002).""" +import json +import textwrap +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from typer.testing import CliRunner + +from bridge.catalog.loader import load_catalog +from bridge.catalog.resolver import BridgeNotFound, resolve +from bridge.catalog.validator import validate_catalog +from bridge.cli import app + +runner = CliRunner() + + +@pytest.fixture +def catalog_dir(tmp_path): + root = tmp_path / "opscatalog" + domain_dir = root / "domains" / "coulombcore" + (domain_dir / "targets").mkdir(parents=True) + (domain_dir / "bridges").mkdir(parents=True) + (domain_dir / "docs").mkdir(parents=True) + actors_dir = root / "actors" + actors_dir.mkdir(parents=True) + + (domain_dir / "domain.yaml").write_text(textwrap.dedent("""\ + type: domain + id: coulombcore + name: CoulombCore Infrastructure + description: Core infra + environment: production + """)) + (domain_dir / "targets" / "state-hub.yaml").write_text(textwrap.dedent("""\ + type: target + id: state-hub + domain: coulombcore + kind: service + description: State coordination service + reachable_via: + - state-hub-coulombcore + """)) + (domain_dir / "bridges" / "state-hub-coulombcore.yaml").write_text(textwrap.dedent("""\ + type: bridge + id: state-hub-coulombcore + domain: coulombcore + target: state-hub + description: Ops bridge for state hub + access_method: ssh-reverse + host: coulombcore.local + remote_port: 18000 + local_port: 8000 + ssh_user: ubuntu + ssh_key: ~/.ssh/id_ops + actor: agent.claude-coulombcore + reconnect: + max_attempts: 0 + backoff_initial: 5 + backoff_max: 60 + """)) + (actors_dir / "agents.yaml").write_text(textwrap.dedent("""\ + type: actor + id: agent.claude-coulombcore + class: automation + description: Claude Code agent on CoulombCore + """)) + (domain_dir / "docs" / "overview.md").write_text( + "# CoulombCore Overview\nCore infrastructure notes." + ) + return root + + +@pytest.fixture +def config_with_catalog(tmp_path, catalog_dir): + f = tmp_path / "tunnels.yaml" + f.write_text(textwrap.dedent(f"""\ + catalog_path: {catalog_dir} + tunnels: {{}} + actors: {{}} + """)) + return f + + +@pytest.fixture +def env(config_with_catalog, tmp_path): + return { + "BRIDGE_CONFIG": str(config_with_catalog), + "BRIDGE_STATE_DIR": str(tmp_path / "state"), + } + + +class TestT14CatalogLoadAndResolve: + def test_catalog_loads_all_types(self, catalog_dir): + cat = load_catalog(catalog_dir) + assert "coulombcore" in cat.domains + assert "state-hub" in cat.targets + assert "state-hub-coulombcore" in cat.bridges + assert "agent.claude-coulombcore" in cat.actors + + def test_resolve_from_catalog(self, catalog_dir): + cat = load_catalog(catalog_dir) + tc = resolve("state-hub-coulombcore", catalog=cat, inline_tunnels={}) + assert tc.name == "state-hub-coulombcore" + assert tc.host == "coulombcore.local" + assert tc.remote_port == 18000 + + def test_bridge_up_with_catalog_bridge(self, env): + with patch("bridge.cli.TunnelManager") as mock_mgr_cls: + mock_mgr = MagicMock() + mock_mgr.is_running.return_value = False + mock_mgr_cls.return_value = mock_mgr + + result = runner.invoke(app, ["up", "state-hub-coulombcore"], env=env) + + assert result.exit_code == 0 + mock_mgr.start.assert_called_once() + # Verify TunnelManager was constructed with correct config + call_args = mock_mgr_cls.call_args + tcfg = call_args[0][0] + assert tcfg.host == "coulombcore.local" + assert tcfg.remote_port == 18000 + + +class TestT15BridgeTargetsOutput: + def test_targets_table(self, env): + result = runner.invoke(app, ["targets"], env=env) + assert result.exit_code == 0 + assert "state-hub" in result.output + assert "coulombcore" in result.output + assert "service" in result.output + + def test_targets_json_structure(self, env): + result = runner.invoke(app, ["targets", "--json"], env=env) + assert result.exit_code == 0 + data = json.loads(result.output) + assert len(data) == 1 + t = data[0] + assert t["target"] == "state-hub" + assert t["domain"] == "coulombcore" + assert t["kind"] == "service" + assert "state-hub-coulombcore" in t["bridges"] + + def test_targets_show_includes_docs(self, env): + result = runner.invoke(app, ["targets", "show", "state-hub"], env=env) + assert result.exit_code == 0 + assert "state-hub" in result.output + assert "coulombcore" in result.output + + +class TestT16CatalogValidate: + def test_validate_clean_catalog_exit_0(self, env): + result = runner.invoke(app, ["catalog", "validate"], env=env) + assert result.exit_code == 0 + assert "ok" in result.output.lower() or "0" in result.output + + def test_validate_dangling_reference_exit_1(self, tmp_path): + root = tmp_path / "bad" + domain_dir = root / "domains" / "d" + (domain_dir / "targets").mkdir(parents=True) + (domain_dir / "bridges").mkdir(parents=True) + (root / "actors").mkdir(parents=True) + + (domain_dir / "domain.yaml").write_text("type: domain\nid: d\nname: D\n") + (domain_dir / "targets" / "t.yaml").write_text( + "type: target\nid: t\ndomain: d\nkind: service\n" + "reachable_via:\n - nonexistent-bridge\n" + ) + (domain_dir / "bridges" / "b.yaml").write_text( + "type: bridge\nid: b\ndomain: d\ntarget: t\n" + "host: h\nremote_port: 1\nlocal_port: 2\n" + "ssh_user: u\nssh_key: k\nactor: missing-actor\n" + ) + + f = tmp_path / "tunnels.yaml" + f.write_text(f"catalog_path: {root}\ntunnels: {{}}\nactors: {{}}\n") + + result = runner.invoke(app, ["catalog", "validate"], env={"BRIDGE_CONFIG": str(f)}) + assert result.exit_code == 1 + assert "nonexistent-bridge" in result.output or "missing-actor" in result.output + + def test_catalog_list_shows_counts(self, env): + result = runner.invoke(app, ["catalog", "list"], env=env) + assert result.exit_code == 0 + assert "coulombcore" in result.output + + def test_catalog_show_bridge(self, env): + result = runner.invoke(app, ["catalog", "show", "state-hub-coulombcore"], env=env) + assert result.exit_code == 0 + assert "coulombcore.local" in result.output + assert "18000" in result.output + + def test_validate_using_validator_directly(self, catalog_dir): + cat = load_catalog(catalog_dir) + errors = validate_catalog(cat) + assert errors == [] diff --git a/tests/test_catalog_loader.py b/tests/test_catalog_loader.py new file mode 100644 index 0000000..037ad70 --- /dev/null +++ b/tests/test_catalog_loader.py @@ -0,0 +1,141 @@ +"""Tests for catalog loader.""" +import textwrap +from pathlib import Path + +import pytest + +from bridge.catalog.loader import CatalogLoadError, load_catalog +from bridge.catalog.models import Catalog + + +@pytest.fixture +def catalog_dir(tmp_path): + """Build a minimal valid catalog fixture.""" + root = tmp_path / "opscatalog" + domain_dir = root / "domains" / "coulombcore" + (domain_dir / "targets").mkdir(parents=True) + (domain_dir / "bridges").mkdir(parents=True) + (domain_dir / "docs").mkdir(parents=True) + actors_dir = root / "actors" + actors_dir.mkdir(parents=True) + + (domain_dir / "domain.yaml").write_text(textwrap.dedent("""\ + type: domain + id: coulombcore + name: CoulombCore Infrastructure + description: Core infra + environment: production + """)) + + (domain_dir / "targets" / "state-hub.yaml").write_text(textwrap.dedent("""\ + type: target + id: state-hub + domain: coulombcore + kind: service + description: State coordination service + reachable_via: + - state-hub-coulombcore + """)) + + (domain_dir / "bridges" / "state-hub-coulombcore.yaml").write_text(textwrap.dedent("""\ + type: bridge + id: state-hub-coulombcore + domain: coulombcore + target: state-hub + description: Ops bridge + access_method: ssh-reverse + host: coulombcore.local + remote_port: 18000 + local_port: 8000 + ssh_user: ubuntu + ssh_key: ~/.ssh/id_ops + actor: agent.claude-coulombcore + health_check: + url: http://127.0.0.1:18000/health + interval_seconds: 30 + timeout_seconds: 5 + reconnect: + max_attempts: 0 + backoff_initial: 5 + backoff_max: 60 + """)) + + (actors_dir / "agents.yaml").write_text(textwrap.dedent("""\ + type: actor + id: agent.claude-coulombcore + class: automation + description: Claude Code agent on CoulombCore + """)) + + (domain_dir / "docs" / "overview.md").write_text("# Overview\nSome ops notes.") + + return root + + +class TestLoadCatalog: + def test_loads_domain(self, catalog_dir): + cat = load_catalog(catalog_dir) + assert "coulombcore" in cat.domains + d = cat.domains["coulombcore"] + assert d.name == "CoulombCore Infrastructure" + assert d.environment == "production" + + def test_loads_target(self, catalog_dir): + cat = load_catalog(catalog_dir) + assert "state-hub" in cat.targets + t = cat.targets["state-hub"] + assert t.domain == "coulombcore" + assert t.kind == "service" + assert "state-hub-coulombcore" in t.reachable_via + + def test_loads_bridge(self, catalog_dir): + cat = load_catalog(catalog_dir) + assert "state-hub-coulombcore" in cat.bridges + b = cat.bridges["state-hub-coulombcore"] + assert b.host == "coulombcore.local" + assert b.remote_port == 18000 + assert b.health_check is not None + assert b.health_check.url == "http://127.0.0.1:18000/health" + assert b.reconnect is not None + assert b.reconnect.max_attempts == 0 + + def test_loads_actor(self, catalog_dir): + cat = load_catalog(catalog_dir) + assert "agent.claude-coulombcore" in cat.actors + a = cat.actors["agent.claude-coulombcore"] + assert a.actor_class == "automation" + + def test_unknown_type_skipped(self, catalog_dir): + (catalog_dir / "domains" / "coulombcore" / "unknown.yaml").write_text( + "type: mystery\nid: x\n" + ) + # Should not raise + cat = load_catalog(catalog_dir) + assert isinstance(cat, Catalog) + + def test_empty_catalog_dir(self, tmp_path): + root = tmp_path / "empty" + root.mkdir() + cat = load_catalog(root) + assert cat.domains == {} + assert cat.bridges == {} + + def test_missing_required_field_raises(self, tmp_path): + root = tmp_path / "bad" + domain_dir = root / "domains" / "x" + domain_dir.mkdir(parents=True) + (domain_dir / "domain.yaml").write_text("type: domain\nname: X\n") + with pytest.raises(CatalogLoadError, match="id"): + load_catalog(root) + + def test_nonexistent_path_raises(self, tmp_path): + with pytest.raises(CatalogLoadError, match="not found"): + load_catalog(tmp_path / "nonexistent") + + def test_invalid_yaml_raises(self, tmp_path): + root = tmp_path / "bad" + domain_dir = root / "domains" / "x" + domain_dir.mkdir(parents=True) + (domain_dir / "domain.yaml").write_text("type: domain\n[\nbad: yaml") + with pytest.raises(CatalogLoadError): + load_catalog(root) diff --git a/tests/test_catalog_models.py b/tests/test_catalog_models.py new file mode 100644 index 0000000..f04cbab --- /dev/null +++ b/tests/test_catalog_models.py @@ -0,0 +1,116 @@ +"""Tests for catalog domain models.""" +import pytest +from bridge.catalog.models import ( + ActorClass, + Catalog, + CatalogBridge, + CatalogDomain, + CatalogTarget, +) + + +class TestCatalogDomain: + def test_required_fields(self): + d = CatalogDomain(id="coulombcore", name="CoulombCore Infra") + assert d.id == "coulombcore" + assert d.name == "CoulombCore Infra" + + def test_optional_fields_default(self): + d = CatalogDomain(id="x", name="X") + assert d.description == "" + assert d.environment == "" + + +class TestCatalogTarget: + def test_required_fields(self): + t = CatalogTarget(id="state-hub", domain="coulombcore", kind="service") + assert t.id == "state-hub" + assert t.domain == "coulombcore" + assert t.kind == "service" + + def test_reachable_via_defaults_empty(self): + t = CatalogTarget(id="t", domain="d", kind="service") + assert t.reachable_via == [] + + def test_reachable_via(self): + t = CatalogTarget(id="t", domain="d", kind="service", reachable_via=["b1", "b2"]) + assert t.reachable_via == ["b1", "b2"] + + +class TestCatalogBridge: + def test_required_fields(self): + b = CatalogBridge( + id="state-hub-coulombcore", + domain="coulombcore", + target="state-hub", + host="coulombcore.local", + remote_port=18000, + local_port=8000, + ssh_user="ubuntu", + ssh_key="~/.ssh/id_ops", + actor="agent.claude-coulombcore", + ) + assert b.id == "state-hub-coulombcore" + assert b.domain == "coulombcore" + assert b.host == "coulombcore.local" + + def test_optional_fields_default(self): + b = CatalogBridge( + id="b", + domain="d", + target="t", + host="h", + remote_port=1, + local_port=2, + ssh_user="u", + ssh_key="k", + actor="a", + ) + assert b.description == "" + assert b.access_method == "ssh-reverse" + assert b.health_check is None + assert b.reconnect is None + + def test_to_tunnel_config(self): + from bridge.models import TunnelConfig + b = CatalogBridge( + id="state-hub-coulombcore", + domain="coulombcore", + target="state-hub", + host="coulombcore.local", + remote_port=18000, + local_port=8000, + ssh_user="ubuntu", + ssh_key="~/.ssh/id_ops", + actor="agent.claude-coulombcore", + ) + tc = b.to_tunnel_config() + assert isinstance(tc, TunnelConfig) + assert tc.name == "state-hub-coulombcore" + assert tc.host == "coulombcore.local" + assert tc.remote_port == 18000 + + +class TestActorClass: + def test_fields(self): + a = ActorClass(id="agent.claude", actor_class="automation", description="Claude agent") + assert a.id == "agent.claude" + assert a.actor_class == "automation" + + def test_optional_description(self): + a = ActorClass(id="x", actor_class="human") + assert a.description == "" + + +class TestCatalog: + def test_empty_catalog(self): + c = Catalog() + assert c.domains == {} + assert c.targets == {} + assert c.bridges == {} + assert c.actors == {} + + def test_add_entries(self): + c = Catalog() + c.domains["d"] = CatalogDomain(id="d", name="D") + assert "d" in c.domains diff --git a/tests/test_catalog_resolver.py b/tests/test_catalog_resolver.py new file mode 100644 index 0000000..8ec9add --- /dev/null +++ b/tests/test_catalog_resolver.py @@ -0,0 +1,89 @@ +"""Tests for catalog resolver.""" +import pytest +from bridge.catalog.models import ( + ActorClass, + Catalog, + CatalogBridge, + CatalogDomain, + CatalogTarget, +) +from bridge.catalog.resolver import BridgeNotFound, resolve +from bridge.models import TunnelConfig, ReconnectPolicy + + +@pytest.fixture +def catalog(): + cat = Catalog() + cat.domains["d"] = CatalogDomain(id="d", name="D") + cat.targets["t"] = CatalogTarget(id="t", domain="d", kind="service") + cat.bridges["catalog-bridge"] = CatalogBridge( + id="catalog-bridge", + domain="d", + target="t", + host="catalog-host.local", + remote_port=19000, + local_port=9000, + ssh_user="ubuntu", + ssh_key="~/.ssh/catalog", + actor="operator.bernd", + ) + cat.actors["operator.bernd"] = ActorClass(id="operator.bernd", actor_class="human") + return cat + + +@pytest.fixture +def inline_tunnels(): + return { + "inline-bridge": TunnelConfig( + name="inline-bridge", + host="inline-host.local", + remote_port=18000, + local_port=8000, + ssh_user="ubuntu", + ssh_key="~/.ssh/inline", + actor="operator.bernd", + ) + } + + +class TestResolve: + def test_inline_takes_precedence(self, catalog, inline_tunnels): + tc = resolve("inline-bridge", catalog=catalog, inline_tunnels=inline_tunnels) + assert tc.host == "inline-host.local" + + def test_catalog_fallback(self, catalog, inline_tunnels): + tc = resolve("catalog-bridge", catalog=catalog, inline_tunnels=inline_tunnels) + assert tc.host == "catalog-host.local" + assert tc.remote_port == 19000 + + def test_catalog_fallback_no_inline(self, catalog): + tc = resolve("catalog-bridge", catalog=catalog, inline_tunnels={}) + assert tc.name == "catalog-bridge" + + def test_missing_name_raises(self, catalog, inline_tunnels): + with pytest.raises(BridgeNotFound, match="nonexistent"): + resolve("nonexistent", catalog=catalog, inline_tunnels=inline_tunnels) + + def test_missing_name_no_catalog_raises(self, inline_tunnels): + with pytest.raises(BridgeNotFound): + resolve("nonexistent", catalog=None, inline_tunnels=inline_tunnels) + + def test_inline_bridge_returns_tunnel_config(self, catalog, inline_tunnels): + tc = resolve("inline-bridge", catalog=catalog, inline_tunnels=inline_tunnels) + assert isinstance(tc, TunnelConfig) + + def test_catalog_bridge_returns_tunnel_config(self, catalog): + tc = resolve("catalog-bridge", catalog=catalog, inline_tunnels={}) + assert isinstance(tc, TunnelConfig) + + def test_catalog_is_none_no_inline_raises(self): + with pytest.raises(BridgeNotFound): + resolve("any-name", catalog=None, inline_tunnels={}) + + def test_resolve_preserves_reconnect_policy(self, catalog): + from bridge.models import ReconnectPolicy + catalog.bridges["catalog-bridge"].reconnect = ReconnectPolicy( + max_attempts=3, backoff_initial=2, backoff_max=30 + ) + tc = resolve("catalog-bridge", catalog=catalog, inline_tunnels={}) + assert tc.reconnect.max_attempts == 3 diff --git a/tests/test_catalog_validator.py b/tests/test_catalog_validator.py new file mode 100644 index 0000000..1832ef7 --- /dev/null +++ b/tests/test_catalog_validator.py @@ -0,0 +1,94 @@ +"""Tests for catalog validator.""" +import pytest +from bridge.catalog.models import ( + ActorClass, + Catalog, + CatalogBridge, + CatalogDomain, + CatalogTarget, +) +from bridge.catalog.validator import ValidationError, validate_catalog + + +def _make_full_catalog() -> Catalog: + cat = Catalog() + cat.domains["coulombcore"] = CatalogDomain(id="coulombcore", name="CoulombCore") + cat.targets["state-hub"] = CatalogTarget( + id="state-hub", + domain="coulombcore", + kind="service", + reachable_via=["state-hub-coulombcore"], + ) + cat.bridges["state-hub-coulombcore"] = CatalogBridge( + id="state-hub-coulombcore", + domain="coulombcore", + target="state-hub", + host="host.local", + remote_port=18000, + local_port=8000, + ssh_user="ubuntu", + ssh_key="~/.ssh/id_ops", + actor="agent.claude-coulombcore", + ) + cat.actors["agent.claude-coulombcore"] = ActorClass( + id="agent.claude-coulombcore", + actor_class="automation", + ) + return cat + + +class TestValidateCatalog: + def test_valid_catalog_no_errors(self): + cat = _make_full_catalog() + errors = validate_catalog(cat) + assert errors == [] + + def test_target_domain_must_exist(self): + cat = _make_full_catalog() + cat.targets["orphan"] = CatalogTarget( + id="orphan", domain="nonexistent-domain", kind="service" + ) + errors = validate_catalog(cat) + assert any("orphan" in e and "nonexistent-domain" in e for e in errors) + + def test_target_reachable_via_must_exist(self): + cat = _make_full_catalog() + cat.targets["state-hub"].reachable_via.append("nonexistent-bridge") + errors = validate_catalog(cat) + assert any("nonexistent-bridge" in e for e in errors) + + def test_bridge_domain_must_exist(self): + cat = _make_full_catalog() + cat.bridges["state-hub-coulombcore"].domain = "missing-domain" + errors = validate_catalog(cat) + assert any("missing-domain" in e for e in errors) + + def test_bridge_target_must_exist(self): + cat = _make_full_catalog() + cat.bridges["state-hub-coulombcore"].target = "missing-target" + errors = validate_catalog(cat) + assert any("missing-target" in e for e in errors) + + def test_bridge_actor_must_exist(self): + cat = _make_full_catalog() + cat.bridges["state-hub-coulombcore"].actor = "nonexistent-actor" + errors = validate_catalog(cat) + assert any("nonexistent-actor" in e for e in errors) + + def test_multiple_errors_all_reported(self): + cat = Catalog() + # Target with dangling domain and reachable_via + cat.targets["t1"] = CatalogTarget( + id="t1", domain="missing", kind="service", reachable_via=["missing-bridge"] + ) + # Bridge with dangling domain + target + actor + cat.bridges["b1"] = CatalogBridge( + id="b1", domain="missing", target="missing", host="h", + remote_port=1, local_port=2, ssh_user="u", ssh_key="k", actor="missing-actor", + ) + errors = validate_catalog(cat) + assert len(errors) >= 4 + + def test_empty_catalog_is_valid(self): + cat = Catalog() + assert validate_catalog(cat) == []