Compare commits

...

3 Commits

Author SHA1 Message Date
baee28eda2 chore: add Claude Code project settings
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 02:10:14 +00:00
91d031ae20 feat: implement OpsCatalog extension (BRIDGE-WP-0002)
Adds the OpsCatalog subsystem: a Git-backed YAML catalog of operations
domains, targets, bridges, and actor classes. Includes catalog loader,
cross-reference validator, bridge resolver (inline-first, catalog
fallback), and new CLI commands: `bridge targets`, `bridge targets show`,
`bridge catalog list/validate/show`. Updates `up/down/restart` to resolve
bridge names from the catalog when not defined inline. 142 tests, all green.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 02:05:06 +00:00
a7eaf59ced feat: implement OpsBridge CLI (BRIDGE-WP-0001)
Full TDD implementation of the `bridge` CLI tool covering all phases
from BRIDGE-WP-0001: project scaffolding, config loading, state
management, audit logging, health checks, tunnel lifecycle manager, and
all CLI commands (up/down/restart/status/logs). 77 tests, all green.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 01:40:08 +00:00
30 changed files with 3193 additions and 0 deletions

5
.claude/settings.json Normal file
View File

@@ -0,0 +1,5 @@
{
"enabledPlugins": {
"commit-commands@claude-plugins-official": true
}
}

34
pyproject.toml Normal file
View File

@@ -0,0 +1,34 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "ops-bridge"
version = "0.1.0"
description = "SSH reverse tunnel lifecycle manager"
requires-python = ">=3.11"
dependencies = [
"typer>=0.12",
"pyyaml>=6.0",
"httpx>=0.27",
]
[project.scripts]
bridge = "bridge.cli:app"
[tool.hatch.build.targets.wheel]
packages = ["src/bridge"]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["src"]
[tool.ruff]
line-length = 88
[dependency-groups]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
"ruff>=0.4",
]

0
src/bridge/__init__.py Normal file
View File

65
src/bridge/audit.py Normal file
View File

@@ -0,0 +1,65 @@
"""Audit logging for OpsBridge lifecycle events."""
from __future__ import annotations
import json
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
class AuditEvent(str, Enum):
BRIDGE_STARTED = "bridge_started"
BRIDGE_CONNECTED = "bridge_connected"
BRIDGE_DISCONNECTED = "bridge_disconnected"
BRIDGE_RECONNECTING = "bridge_reconnecting"
HEALTH_CHECK_FAILED = "health_check_failed"
HEALTH_CHECK_RECOVERED = "health_check_recovered"
BRIDGE_STOPPED = "bridge_stopped"
def _default_state_dir() -> Path:
return Path.home() / ".local" / "state" / "bridge"
class AuditLogger:
def __init__(self, state_dir: Optional[Path] = None):
self._dir = Path(state_dir) if state_dir else _default_state_dir()
def _log_path(self, tunnel: str) -> Path:
return self._dir / f"{tunnel}.log"
def log(
self,
tunnel: str,
event: AuditEvent,
actor: str,
actor_class: str,
detail: str = "",
) -> None:
self._dir.mkdir(parents=True, exist_ok=True)
entry: Dict[str, Any] = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tunnel": tunnel,
"actor": actor,
"actor_class": actor_class,
"event": event.value,
}
if detail:
entry["detail"] = detail
with self._log_path(tunnel).open("a") as f:
f.write(json.dumps(entry) + "\n")
def read_events(self, tunnel: str) -> List[Dict[str, Any]]:
path = self._log_path(tunnel)
if not path.exists():
return []
events = []
for line in path.read_text().splitlines():
line = line.strip()
if line:
try:
events.append(json.loads(line))
except json.JSONDecodeError:
pass
return events

View File

View File

@@ -0,0 +1,142 @@
"""Catalog loader — walks a catalog directory tree and parses YAML files."""
from __future__ import annotations
import logging
import warnings
from pathlib import Path
from typing import Any, Dict
import yaml
from bridge.catalog.models import (
ActorClass,
Catalog,
CatalogBridge,
CatalogDomain,
CatalogTarget,
)
from bridge.models import HealthCheckConfig, ReconnectPolicy
log = logging.getLogger(__name__)
class CatalogLoadError(Exception):
"""Raised when catalog loading fails."""
def load_catalog(path: Path) -> Catalog:
"""Walk the catalog directory and return a populated Catalog."""
path = Path(path)
if not path.exists():
raise CatalogLoadError(f"Catalog path not found: {path}")
catalog = Catalog()
for yaml_file in sorted(path.rglob("*.yaml")):
_load_file(yaml_file, catalog)
return catalog
def _load_file(path: Path, catalog: Catalog) -> None:
try:
with path.open() as f:
data = yaml.safe_load(f)
except yaml.YAMLError as e:
raise CatalogLoadError(f"Invalid YAML in {path}: {e}") from e
if not isinstance(data, dict):
log.warning("Skipping %s: not a YAML mapping", path)
return
entry_type = data.get("type")
if not entry_type:
log.warning("Skipping %s: no 'type' field", path)
return
try:
if entry_type == "domain":
entry = _parse_domain(data, path)
catalog.domains[entry.id] = entry
elif entry_type == "target":
entry = _parse_target(data, path)
catalog.targets[entry.id] = entry
elif entry_type == "bridge":
entry = _parse_bridge(data, path)
catalog.bridges[entry.id] = entry
elif entry_type == "actor":
entry = _parse_actor(data, path)
catalog.actors[entry.id] = entry
else:
log.warning("Skipping %s: unknown type '%s'", path, entry_type)
except CatalogLoadError:
raise
except Exception as e:
raise CatalogLoadError(f"Error parsing {path}: {e}") from e
def _require(data: dict, field: str, path: Path) -> Any:
if field not in data:
raise CatalogLoadError(f"Missing required field '{field}' in {path}")
return data[field]
def _parse_domain(data: dict, path: Path) -> CatalogDomain:
return CatalogDomain(
id=str(_require(data, "id", path)),
name=str(_require(data, "name", path)),
description=str(data.get("description", "")),
environment=str(data.get("environment", "")),
)
def _parse_target(data: dict, path: Path) -> CatalogTarget:
return CatalogTarget(
id=str(_require(data, "id", path)),
domain=str(_require(data, "domain", path)),
kind=str(_require(data, "kind", path)),
description=str(data.get("description", "")),
reachable_via=list(data.get("reachable_via") or []),
)
def _parse_bridge(data: dict, path: Path) -> CatalogBridge:
health_check = None
if "health_check" in data and data["health_check"]:
hc = data["health_check"]
health_check = HealthCheckConfig(
url=str(_require(hc, "url", path)),
interval_seconds=int(hc.get("interval_seconds", 30)),
timeout_seconds=int(hc.get("timeout_seconds", 5)),
)
reconnect = None
if "reconnect" in data and data["reconnect"]:
r = data["reconnect"]
reconnect = ReconnectPolicy(
max_attempts=int(r.get("max_attempts", 0)),
backoff_initial=int(r.get("backoff_initial", 5)),
backoff_max=int(r.get("backoff_max", 60)),
)
return CatalogBridge(
id=str(_require(data, "id", path)),
domain=str(_require(data, "domain", path)),
target=str(_require(data, "target", path)),
host=str(_require(data, "host", path)),
remote_port=int(_require(data, "remote_port", path)),
local_port=int(_require(data, "local_port", path)),
ssh_user=str(_require(data, "ssh_user", path)),
ssh_key=str(_require(data, "ssh_key", path)),
actor=str(_require(data, "actor", path)),
description=str(data.get("description", "")),
access_method=str(data.get("access_method", "ssh-reverse")),
health_check=health_check,
reconnect=reconnect,
)
def _parse_actor(data: dict, path: Path) -> ActorClass:
return ActorClass(
id=str(_require(data, "id", path)),
actor_class=str(_require(data, "class", path)),
description=str(data.get("description", "")),
)

View File

@@ -0,0 +1,69 @@
"""Domain models for OpsCatalog."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from bridge.models import HealthCheckConfig, ReconnectPolicy, TunnelConfig
@dataclass
class CatalogDomain:
id: str
name: str
description: str = ""
environment: str = ""
@dataclass
class CatalogTarget:
id: str
domain: str
kind: str
description: str = ""
reachable_via: List[str] = field(default_factory=list)
@dataclass
class CatalogBridge:
id: str
domain: str
target: str
host: str
remote_port: int
local_port: int
ssh_user: str
ssh_key: str
actor: str
description: str = ""
access_method: str = "ssh-reverse"
health_check: Optional[HealthCheckConfig] = None
reconnect: Optional[ReconnectPolicy] = None
def to_tunnel_config(self) -> TunnelConfig:
return TunnelConfig(
name=self.id,
host=self.host,
remote_port=self.remote_port,
local_port=self.local_port,
ssh_user=self.ssh_user,
ssh_key=self.ssh_key,
actor=self.actor,
reconnect=self.reconnect if self.reconnect is not None else ReconnectPolicy(),
health_check=self.health_check,
)
@dataclass
class ActorClass:
id: str
actor_class: str
description: str = ""
@dataclass
class Catalog:
domains: Dict[str, CatalogDomain] = field(default_factory=dict)
targets: Dict[str, CatalogTarget] = field(default_factory=dict)
bridges: Dict[str, CatalogBridge] = field(default_factory=dict)
actors: Dict[str, ActorClass] = field(default_factory=dict)

View File

@@ -0,0 +1,35 @@
"""Catalog resolver — resolves a bridge name to a TunnelConfig."""
from __future__ import annotations
from typing import Dict, Optional
from bridge.catalog.models import Catalog
from bridge.models import TunnelConfig
class BridgeNotFound(Exception):
"""Raised when a bridge name cannot be resolved from inline config or catalog."""
def resolve(
name: str,
catalog: Optional[Catalog],
inline_tunnels: Dict[str, TunnelConfig],
) -> TunnelConfig:
"""Resolve bridge name to TunnelConfig.
Lookup order:
1. inline_tunnels (from tunnels.yaml) — wins if present
2. catalog bridges — fallback
3. raises BridgeNotFound if neither has the name
"""
if name in inline_tunnels:
return inline_tunnels[name]
if catalog is not None and name in catalog.bridges:
return catalog.bridges[name].to_tunnel_config()
raise BridgeNotFound(
f"Bridge '{name}' not found in inline config"
+ (" or catalog" if catalog is not None else " (no catalog configured)")
)

View File

@@ -0,0 +1,42 @@
"""Catalog validator — cross-reference checks for catalog consistency."""
from __future__ import annotations
from typing import List
from bridge.catalog.models import Catalog
class ValidationError(Exception):
"""Raised when catalog validation fails (used for programmatic access)."""
def validate_catalog(catalog: Catalog) -> List[str]:
"""Return a list of validation error strings (empty = valid)."""
errors: List[str] = []
for target in catalog.targets.values():
if target.domain not in catalog.domains:
errors.append(
f"Target '{target.id}': domain '{target.domain}' does not exist in catalog"
)
for bridge_id in target.reachable_via:
if bridge_id not in catalog.bridges:
errors.append(
f"Target '{target.id}': reachable_via references unknown bridge '{bridge_id}'"
)
for bridge in catalog.bridges.values():
if bridge.domain not in catalog.domains:
errors.append(
f"Bridge '{bridge.id}': domain '{bridge.domain}' does not exist in catalog"
)
if bridge.target not in catalog.targets:
errors.append(
f"Bridge '{bridge.id}': target '{bridge.target}' does not exist in catalog"
)
if bridge.actor not in catalog.actors:
errors.append(
f"Bridge '{bridge.id}': actor '{bridge.actor}' does not exist in catalog"
)
return errors

471
src/bridge/cli.py Normal file
View File

@@ -0,0 +1,471 @@
"""CLI for OpsBridge — bridge command."""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Optional
import typer
from bridge.audit import AuditLogger
from bridge.config import ConfigError, load_config
from bridge.manager import TunnelManager
from bridge.models import BridgeState
from bridge.state import StateManager
app = typer.Typer(
name="bridge",
help="OpsBridge — SSH reverse tunnel lifecycle manager.",
no_args_is_help=True,
)
targets_app = typer.Typer(help="Inspect infrastructure targets from the OpsCatalog.")
catalog_app = typer.Typer(help="Inspect and validate the OpsCatalog.")
app.add_typer(targets_app, name="targets")
app.add_typer(catalog_app, name="catalog")
def _state_dir() -> Path:
return Path(os.environ.get("BRIDGE_STATE_DIR", str(Path.home() / ".local" / "state" / "bridge")))
def _load_or_exit():
try:
return load_config()
except ConfigError as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
def _load_catalog_or_exit(cfg):
from bridge.catalog.loader import CatalogLoadError, load_catalog
if cfg.catalog_path is None:
typer.echo("Error: catalog_path not configured in tunnels.yaml", err=True)
raise typer.Exit(1)
try:
return load_catalog(cfg.catalog_path)
except Exception as e:
typer.echo(f"Error loading catalog: {e}", err=True)
raise typer.Exit(1)
def _resolve_tunnel(cfg, name: str):
"""Resolve tunnel name: inline first, then catalog, then error."""
from bridge.catalog.loader import load_catalog
from bridge.catalog.resolver import BridgeNotFound, resolve
catalog = None
if cfg.catalog_path is not None:
try:
catalog = load_catalog(cfg.catalog_path)
except Exception:
pass
try:
return resolve(name, catalog=catalog, inline_tunnels=cfg.tunnels)
except BridgeNotFound:
typer.echo(f"Error: tunnel '{name}' not found in config or catalog", err=True)
raise typer.Exit(1)
def _all_tunnel_names(cfg):
"""Return names from inline config (all-tunnels operations use inline only)."""
return list(cfg.tunnels.keys())
# ─── Tunnel lifecycle commands ────────────────────────────────────────────────
@app.command()
def up(
tunnel: Optional[str] = typer.Argument(None, help="Tunnel name (omit for all inline)"),
):
"""Start one or all tunnels."""
cfg = _load_or_exit()
sd = _state_dir()
if tunnel:
tcfg = _resolve_tunnel(cfg, tunnel)
mgr = TunnelManager(tcfg, state_dir=sd)
if mgr.is_running():
typer.echo(f"Tunnel '{tunnel}' is already running.")
raise typer.Exit(2)
mgr.start()
typer.echo(f"Started tunnel '{tunnel}'.")
else:
names = _all_tunnel_names(cfg)
any_already_running = False
for name in names:
tcfg = cfg.tunnels[name]
mgr = TunnelManager(tcfg, state_dir=sd)
if mgr.is_running():
typer.echo(f"Tunnel '{name}' is already running.")
any_already_running = True
else:
mgr.start()
typer.echo(f"Started tunnel '{name}'.")
if any_already_running and len(names) == 1:
raise typer.Exit(2)
@app.command()
def down(
tunnel: Optional[str] = typer.Argument(None, help="Tunnel name (omit for all inline)"),
):
"""Stop one or all tunnels."""
cfg = _load_or_exit()
sd = _state_dir()
if tunnel:
tcfg = _resolve_tunnel(cfg, tunnel)
mgr = TunnelManager(tcfg, state_dir=sd)
if not mgr.is_running():
typer.echo(f"Tunnel '{tunnel}' is not running.")
raise typer.Exit(2)
mgr.stop()
typer.echo(f"Stopped tunnel '{tunnel}'.")
else:
names = _all_tunnel_names(cfg)
any_not_running = False
for name in names:
tcfg = cfg.tunnels[name]
mgr = TunnelManager(tcfg, state_dir=sd)
if not mgr.is_running():
typer.echo(f"Tunnel '{name}' is not running.")
any_not_running = True
else:
mgr.stop()
typer.echo(f"Stopped tunnel '{name}'.")
if any_not_running and len(names) == 1:
raise typer.Exit(2)
@app.command()
def restart(
tunnel: Optional[str] = typer.Argument(None, help="Tunnel name (omit for all inline)"),
):
"""Restart one or all tunnels."""
cfg = _load_or_exit()
sd = _state_dir()
if tunnel:
tcfg = _resolve_tunnel(cfg, tunnel)
mgr = TunnelManager(tcfg, state_dir=sd)
mgr.stop()
mgr.start()
typer.echo(f"Restarted tunnel '{tunnel}'.")
else:
for name in _all_tunnel_names(cfg):
tcfg = cfg.tunnels[name]
mgr = TunnelManager(tcfg, state_dir=sd)
mgr.stop()
mgr.start()
typer.echo(f"Restarted tunnel '{name}'.")
@app.command()
def status(
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""Show status of all tunnels."""
cfg = _load_or_exit()
sd = _state_dir()
state_mgr = StateManager(state_dir=sd)
rows = []
for name, tcfg in cfg.tunnels.items():
state = state_mgr.read_state(name)
pid = state_mgr.read_pid(name)
rows.append({
"tunnel": name,
"state": state.value,
"actor": tcfg.actor,
"host": tcfg.host,
"pid": pid,
"uptime": None,
"health": None,
})
if as_json:
typer.echo(json.dumps(rows, indent=2))
else:
_print_status_table(rows)
def _print_status_table(rows):
if not rows:
typer.echo("No tunnels configured.")
return
headers = ["TUNNEL", "STATE", "ACTOR", "HOST", "PID"]
col_widths = [
max(len(h), max((len(str(r.get(h.lower(), "") or "")) for r in rows), default=0))
for h in headers
]
def _fmt_row(vals):
return " ".join(str(v).ljust(w) for v, w in zip(vals, col_widths))
typer.echo(_fmt_row(headers))
typer.echo(_fmt_row(["-" * w for w in col_widths]))
for row in rows:
typer.echo(_fmt_row([
row["tunnel"],
row["state"],
row["actor"],
row["host"],
str(row["pid"] or ""),
]))
@app.command()
def logs(
tunnel: str = typer.Argument(..., help="Tunnel name"),
lines: int = typer.Option(50, "--lines", "-n", help="Number of lines to show"),
follow: bool = typer.Option(False, "--follow", "-f", help="Follow the log"),
):
"""Show audit log for a tunnel."""
cfg = _load_or_exit()
_resolve_tunnel(cfg, tunnel) # validate name
sd = _state_dir()
logger = AuditLogger(state_dir=sd)
events = logger.read_events(tunnel)
if not events:
typer.echo(f"No log entries for tunnel '{tunnel}'.")
return
for entry in events[-lines:]:
ts = entry.get("timestamp", "")
event = entry.get("event", "")
actor = entry.get("actor", "")
detail = entry.get("detail", "")
parts = [ts, event, f"actor={actor}"]
if detail:
parts.append(detail)
typer.echo(" ".join(parts))
if follow:
import time
log_path = sd / f"{tunnel}.log"
try:
with log_path.open() as f:
f.seek(0, 2)
while True:
line = f.readline()
if line:
try:
entry = json.loads(line)
ts = entry.get("timestamp", "")
event = entry.get("event", "")
actor = entry.get("actor", "")
detail = entry.get("detail", "")
parts = [ts, event, f"actor={actor}"]
if detail:
parts.append(detail)
typer.echo(" ".join(parts))
except json.JSONDecodeError:
pass
else:
time.sleep(0.5)
except KeyboardInterrupt:
pass
# ─── targets commands ─────────────────────────────────────────────────────────
@targets_app.callback(invoke_without_command=True)
def targets_default(
ctx: typer.Context,
domain: Optional[str] = typer.Option(None, "--domain", help="Filter by domain"),
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""List infrastructure targets from the OpsCatalog."""
if ctx.invoked_subcommand is not None:
return
cfg = _load_or_exit()
cat = _load_catalog_or_exit(cfg)
rows = []
for t in cat.targets.values():
if domain and t.domain != domain:
continue
rows.append({
"domain": t.domain,
"target": t.id,
"kind": t.kind,
"description": t.description,
"bridges": t.reachable_via,
})
if as_json:
typer.echo(json.dumps(rows, indent=2))
else:
if not rows:
typer.echo("No targets found.")
return
headers = ["DOMAIN", "TARGET", "KIND", "BRIDGES"]
col_widths = [
max(len(h), max((len(str(r.get(h.lower(), "") or "")) for r in rows), default=0))
for h in headers
]
def _fmt(vals):
return " ".join(str(v).ljust(w) for v, w in zip(vals, col_widths))
typer.echo(_fmt(headers))
typer.echo(_fmt(["-" * w for w in col_widths]))
for row in rows:
typer.echo(_fmt([
row["domain"],
row["target"],
row["kind"],
", ".join(row["bridges"]),
]))
@targets_app.command("show")
def targets_show(
target: str = typer.Argument(..., help="Target ID"),
):
"""Show full metadata for a target."""
cfg = _load_or_exit()
cat = _load_catalog_or_exit(cfg)
if target not in cat.targets:
typer.echo(f"Error: target '{target}' not found in catalog", err=True)
raise typer.Exit(1)
t = cat.targets[target]
typer.echo(f"Target: {t.id}")
typer.echo(f"Domain: {t.domain}")
typer.echo(f"Kind: {t.kind}")
if t.description:
typer.echo(f"Description: {t.description}")
if t.reachable_via:
typer.echo(f"Bridges: {', '.join(t.reachable_via)}")
# Show ops notes from docs/ if available
if cfg.catalog_path:
docs_dir = cfg.catalog_path / "domains" / t.domain / "docs"
if docs_dir.exists():
for md_file in sorted(docs_dir.glob("*.md")):
typer.echo(f"\n--- {md_file.name} ---")
typer.echo(md_file.read_text())
# ─── catalog commands ─────────────────────────────────────────────────────────
@catalog_app.callback(invoke_without_command=True)
def catalog_default(ctx: typer.Context):
"""Inspect and validate the OpsCatalog."""
if ctx.invoked_subcommand is None:
typer.echo(ctx.get_help())
@catalog_app.command("list")
def catalog_list(
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""List all domains with target and bridge counts."""
cfg = _load_or_exit()
cat = _load_catalog_or_exit(cfg)
rows = []
for domain in cat.domains.values():
target_count = sum(1 for t in cat.targets.values() if t.domain == domain.id)
bridge_count = sum(1 for b in cat.bridges.values() if b.domain == domain.id)
rows.append({
"domain": domain.id,
"name": domain.name,
"environment": domain.environment,
"targets": target_count,
"bridges": bridge_count,
})
if as_json:
typer.echo(json.dumps(rows, indent=2))
else:
if not rows:
typer.echo("Catalog is empty.")
return
headers = ["DOMAIN", "NAME", "ENV", "TARGETS", "BRIDGES"]
col_widths = [
max(len(h), max((len(str(r.get(h.lower()[:3] if h == "ENV" else h.lower(), "") or "")) for r in rows), default=0))
for h in headers
]
# Manual col widths for cleaner output
col_widths = [
max(len("DOMAIN"), max((len(r["domain"]) for r in rows), default=0)),
max(len("NAME"), max((len(r["name"]) for r in rows), default=0)),
max(len("ENV"), max((len(r["environment"]) for r in rows), default=0)),
max(len("TARGETS"), max((len(str(r["targets"])) for r in rows), default=0)),
max(len("BRIDGES"), max((len(str(r["bridges"])) for r in rows), default=0)),
]
def _fmt(vals):
return " ".join(str(v).ljust(w) for v, w in zip(vals, col_widths))
typer.echo(_fmt(headers))
typer.echo(_fmt(["-" * w for w in col_widths]))
for row in rows:
typer.echo(_fmt([
row["domain"], row["name"], row["environment"],
str(row["targets"]), str(row["bridges"]),
]))
@catalog_app.command("validate")
def catalog_validate():
"""Validate catalog for consistency errors."""
from bridge.catalog.validator import validate_catalog
cfg = _load_or_exit()
cat = _load_catalog_or_exit(cfg)
errors = validate_catalog(cat)
if errors:
typer.echo(f"Catalog has {len(errors)} violation(s):")
for err in errors:
typer.echo(f" - {err}")
raise typer.Exit(1)
else:
typer.echo(f"Catalog OK — {len(cat.domains)} domain(s), {len(cat.targets)} target(s), {len(cat.bridges)} bridge(s).")
@catalog_app.command("show")
def catalog_show(
bridge_id: str = typer.Argument(..., help="Bridge ID"),
):
"""Show full metadata for a bridge."""
cfg = _load_or_exit()
cat = _load_catalog_or_exit(cfg)
if bridge_id not in cat.bridges:
typer.echo(f"Error: bridge '{bridge_id}' not found in catalog", err=True)
raise typer.Exit(1)
b = cat.bridges[bridge_id]
typer.echo(f"Bridge: {b.id}")
typer.echo(f"Domain: {b.domain}")
typer.echo(f"Target: {b.target}")
typer.echo(f"Host: {b.host}")
typer.echo(f"Ports: {b.remote_port} -> {b.local_port}")
typer.echo(f"SSH user: {b.ssh_user}")
typer.echo(f"Actor: {b.actor}")
typer.echo(f"Method: {b.access_method}")
if b.description:
typer.echo(f"Description: {b.description}")
if b.health_check:
typer.echo(f"Health: {b.health_check.url} (every {b.health_check.interval_seconds}s)")
# Domain context
if b.domain in cat.domains:
d = cat.domains[b.domain]
typer.echo(f"\nDomain context: {d.name} [{d.environment}]")
# Target context
if b.target in cat.targets:
t = cat.targets[b.target]
typer.echo(f"Target: {t.description or t.id} ({t.kind})")

115
src/bridge/config.py Normal file
View File

@@ -0,0 +1,115 @@
"""Config loading for OpsBridge."""
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Optional
import yaml
from bridge.models import ActorInfo, HealthCheckConfig, ReconnectPolicy, TunnelConfig
class ConfigError(Exception):
"""Raised when config is invalid or missing."""
@dataclass
class BridgeConfig:
tunnels: Dict[str, TunnelConfig]
actors: Dict[str, ActorInfo]
catalog_path: Optional[Path] = None
def _default_config_path() -> Path:
return Path.home() / ".config" / "bridge" / "tunnels.yaml"
def load_config() -> BridgeConfig:
"""Load and validate tunnels.yaml. Respects BRIDGE_CONFIG env var."""
path = Path(os.environ.get("BRIDGE_CONFIG", str(_default_config_path())))
if not path.exists():
raise ConfigError(f"Config file not found: {path}")
try:
with path.open() as f:
raw = yaml.safe_load(f)
except yaml.YAMLError as e:
raise ConfigError(f"Invalid YAML in {path}: {e}") from e
if not isinstance(raw, dict):
raise ConfigError(f"Config must be a YAML mapping, got: {type(raw)}")
tunnels = _parse_tunnels(raw.get("tunnels") or {})
actors = _parse_actors(raw.get("actors") or {})
catalog_path = None
if "catalog_path" in raw and raw["catalog_path"]:
catalog_path = Path(os.path.expanduser(str(raw["catalog_path"])))
return BridgeConfig(tunnels=tunnels, actors=actors, catalog_path=catalog_path)
def _parse_tunnels(raw: dict) -> Dict[str, TunnelConfig]:
tunnels = {}
for name, data in raw.items():
if not isinstance(data, dict):
raise ConfigError(f"Tunnel '{name}' must be a mapping")
tunnels[name] = _parse_tunnel(name, data)
return tunnels
def _parse_tunnel(name: str, data: dict) -> TunnelConfig:
required = ["host", "remote_port", "local_port", "ssh_user", "ssh_key", "actor"]
for field in required:
if field not in data:
raise ConfigError(f"Tunnel '{name}' missing required field: {field}")
reconnect = ReconnectPolicy()
if "reconnect" in data and data["reconnect"]:
r = data["reconnect"]
reconnect = ReconnectPolicy(
max_attempts=r.get("max_attempts", 0),
backoff_initial=r.get("backoff_initial", 5),
backoff_max=r.get("backoff_max", 60),
)
health_check = None
if "health_check" in data and data["health_check"]:
hc = data["health_check"]
if "url" not in hc:
raise ConfigError(f"Tunnel '{name}' health_check missing required field: url")
health_check = HealthCheckConfig(
url=hc["url"],
interval_seconds=hc.get("interval_seconds", 30),
timeout_seconds=hc.get("timeout_seconds", 5),
)
return TunnelConfig(
name=name,
host=str(data["host"]),
remote_port=int(data["remote_port"]),
local_port=int(data["local_port"]),
ssh_user=str(data["ssh_user"]),
ssh_key=str(data["ssh_key"]),
actor=str(data["actor"]),
reconnect=reconnect,
health_check=health_check,
)
def _parse_actors(raw: dict) -> Dict[str, ActorInfo]:
actors = {}
for name, data in raw.items():
if not isinstance(data, dict):
raise ConfigError(f"Actor '{name}' must be a mapping")
if "class" not in data:
raise ConfigError(f"Actor '{name}' missing required field: class")
actors[name] = ActorInfo(
name=name,
actor_class=str(data["class"]),
description=str(data.get("description", "")),
)
return actors

31
src/bridge/health.py Normal file
View File

@@ -0,0 +1,31 @@
"""HTTP health checker for OpsBridge."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
import httpx
@dataclass
class HealthResult:
ok: bool
status_code: Optional[int] = None
error: Optional[str] = None
class HealthChecker:
def __init__(self, url: str, timeout_seconds: int = 5):
self._url = url
self._timeout = timeout_seconds
async def check(self) -> HealthResult:
try:
async with httpx.AsyncClient(timeout=self._timeout) as client:
response = await client.get(self._url)
response.raise_for_status()
return HealthResult(ok=True, status_code=response.status_code)
except httpx.HTTPStatusError as e:
return HealthResult(ok=False, status_code=e.response.status_code, error=str(e))
except Exception as e:
return HealthResult(ok=False, error=str(e))

252
src/bridge/manager.py Normal file
View File

@@ -0,0 +1,252 @@
"""Tunnel lifecycle manager for OpsBridge."""
from __future__ import annotations
import logging
import os
import signal
import subprocess
import time
from pathlib import Path
from typing import List, Optional
from bridge.audit import AuditEvent, AuditLogger
from bridge.config import BridgeConfig
from bridge.health import HealthChecker
from bridge.models import BridgeState, TunnelConfig
from bridge.state import StateManager
log = logging.getLogger(__name__)
def build_ssh_command(cfg: TunnelConfig) -> List[str]:
"""Build the SSH reverse tunnel command."""
key = os.path.expanduser(cfg.ssh_key)
return [
"ssh",
"-N",
"-R", f"{cfg.remote_port}:127.0.0.1:{cfg.local_port}",
"-i", key,
"-o", "ServerAliveInterval=10",
"-o", "ServerAliveCountMax=3",
"-o", "ExitOnForwardFailure=yes",
"-o", "StrictHostKeyChecking=accept-new",
f"{cfg.ssh_user}@{cfg.host}",
]
class TunnelManager:
"""Manages a single named SSH reverse tunnel.
start() daemonises: forks a child that runs the reconnect loop, then the
parent returns immediately after writing the manager PID.
"""
def __init__(self, cfg: TunnelConfig, state_dir: Optional[Path] = None):
self._cfg = cfg
self._state = StateManager(state_dir=state_dir)
self._audit = AuditLogger(state_dir=state_dir)
def get_state(self) -> BridgeState:
return self._state.read_state(self._cfg.name)
def is_running(self) -> bool:
return self._state.is_running(self._cfg.name)
def _actor_info(self):
return self._cfg.actor, "unknown"
def _next_backoff(self, attempt: int) -> int:
initial = self._cfg.reconnect.backoff_initial
max_b = self._cfg.reconnect.backoff_max
value = initial * (2 ** attempt)
return min(value, max_b)
def start(self) -> None:
"""Start the tunnel manager as a daemonised subprocess."""
if self.is_running():
log.info("Tunnel %s already running", self._cfg.name)
return
self._state.write_state(self._cfg.name, BridgeState.STARTING)
actor, actor_class = self._actor_info()
self._audit.log(
tunnel=self._cfg.name,
event=AuditEvent.BRIDGE_STARTED,
actor=actor,
actor_class=actor_class,
)
pid = os.fork()
if pid > 0:
# Parent: record manager PID and return
self._state.write_pid(self._cfg.name, pid)
return
# Child: become a daemon
os.setsid()
try:
self._run_loop()
except Exception as e:
log.exception("Tunnel manager loop crashed: %s", e)
finally:
self._state.write_state(self._cfg.name, BridgeState.STOPPED)
self._state.clear_pid(self._cfg.name)
self._audit.log(
tunnel=self._cfg.name,
event=AuditEvent.BRIDGE_STOPPED,
actor=actor,
actor_class=actor_class,
)
os._exit(0)
def stop(self) -> None:
"""Stop the running tunnel manager."""
pid = self._state.read_pid(self._cfg.name)
if pid is None:
self._state.write_state(self._cfg.name, BridgeState.STOPPED)
return
try:
os.kill(pid, signal.SIGTERM)
# Give up to 5 seconds for graceful shutdown
for _ in range(50):
try:
os.kill(pid, 0)
time.sleep(0.1)
except ProcessLookupError:
break
else:
# Force kill if still running
try:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass
except ProcessLookupError:
pass
self._state.clear_pid(self._cfg.name)
self._state.write_state(self._cfg.name, BridgeState.STOPPED)
actor, actor_class = self._actor_info()
self._audit.log(
tunnel=self._cfg.name,
event=AuditEvent.BRIDGE_STOPPED,
actor=actor,
actor_class=actor_class,
)
def _run_loop(self) -> None:
"""Reconnect loop running in daemon child."""
import asyncio
cfg = self._cfg
actor, actor_class = self._actor_info()
attempt = 0
max_attempts = cfg.reconnect.max_attempts # 0 = infinite
# Setup signal handler for graceful shutdown
_stop = [False]
def _on_term(signum, frame):
_stop[0] = True
signal.signal(signal.SIGTERM, _on_term)
signal.signal(signal.SIGINT, _on_term)
while not _stop[0]:
if max_attempts > 0 and attempt >= max_attempts:
self._state.write_state(cfg.name, BridgeState.FAILED)
break
cmd = build_ssh_command(cfg)
log.info("Starting SSH: %s", " ".join(cmd))
self._state.write_state(cfg.name, BridgeState.STARTING)
try:
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except FileNotFoundError:
self._state.write_state(cfg.name, BridgeState.FAILED)
self._audit.log(
tunnel=cfg.name,
event=AuditEvent.BRIDGE_DISCONNECTED,
actor=actor,
actor_class=actor_class,
detail="ssh binary not found",
)
break
# Wait briefly then assume connected if still running
time.sleep(2)
if proc.poll() is None:
self._state.write_state(cfg.name, BridgeState.CONNECTED)
self._audit.log(
tunnel=cfg.name,
event=AuditEvent.BRIDGE_CONNECTED,
actor=actor,
actor_class=actor_class,
)
attempt = 0
# Health check loop
if cfg.health_check:
checker = HealthChecker(
url=cfg.health_check.url,
timeout_seconds=cfg.health_check.timeout_seconds,
)
health_failing = False
while not _stop[0] and proc.poll() is None:
result = asyncio.run(checker.check())
if result.ok:
if health_failing:
health_failing = False
self._state.write_state(cfg.name, BridgeState.CONNECTED)
self._audit.log(
tunnel=cfg.name,
event=AuditEvent.HEALTH_CHECK_RECOVERED,
actor=actor,
actor_class=actor_class,
)
else:
if not health_failing:
health_failing = True
self._state.write_state(cfg.name, BridgeState.DEGRADED)
self._audit.log(
tunnel=cfg.name,
event=AuditEvent.HEALTH_CHECK_FAILED,
actor=actor,
actor_class=actor_class,
detail=result.error or f"HTTP {result.status_code}",
)
time.sleep(cfg.health_check.interval_seconds)
else:
while not _stop[0] and proc.poll() is None:
time.sleep(1)
# SSH exited
if proc.poll() is not None:
self._audit.log(
tunnel=cfg.name,
event=AuditEvent.BRIDGE_DISCONNECTED,
actor=actor,
actor_class=actor_class,
detail=f"exit code {proc.returncode}",
)
if _stop[0]:
if proc.poll() is None:
proc.terminate()
break
attempt += 1
backoff = self._next_backoff(attempt - 1)
self._state.write_state(cfg.name, BridgeState.RECONNECTING)
self._audit.log(
tunnel=cfg.name,
event=AuditEvent.BRIDGE_RECONNECTING,
actor=actor,
actor_class=actor_class,
detail=f"retry {attempt}, backoff {backoff}s",
)
log.info("Reconnecting in %ds (attempt %d)", backoff, attempt)
time.sleep(backoff)

49
src/bridge/models.py Normal file
View File

@@ -0,0 +1,49 @@
"""Domain models for OpsBridge."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class BridgeState(str, Enum):
STOPPED = "stopped"
STARTING = "starting"
CONNECTED = "connected"
DEGRADED = "degraded"
RECONNECTING = "reconnecting"
FAILED = "failed"
@dataclass
class ReconnectPolicy:
max_attempts: int = 0 # 0 = infinite
backoff_initial: int = 5
backoff_max: int = 60
@dataclass
class HealthCheckConfig:
url: str
interval_seconds: int = 30
timeout_seconds: int = 5
@dataclass
class TunnelConfig:
name: str
host: str
remote_port: int
local_port: int
ssh_user: str
ssh_key: str
actor: str
reconnect: ReconnectPolicy = field(default_factory=ReconnectPolicy)
health_check: Optional[HealthCheckConfig] = None
@dataclass
class ActorInfo:
name: str
actor_class: str # "human" or "automation"
description: str = ""

73
src/bridge/state.py Normal file
View File

@@ -0,0 +1,73 @@
"""State file management for OpsBridge."""
from __future__ import annotations
import os
from pathlib import Path
from typing import Optional
from bridge.models import BridgeState
def _default_state_dir() -> Path:
return Path.home() / ".local" / "state" / "bridge"
class StateManager:
def __init__(self, state_dir: Optional[Path] = None):
self._dir = Path(state_dir) if state_dir else _default_state_dir()
def _ensure_dir(self) -> None:
self._dir.mkdir(parents=True, exist_ok=True)
def _state_path(self, name: str) -> Path:
return self._dir / f"{name}.state"
def _pid_path(self, name: str) -> Path:
return self._dir / f"{name}.pid"
def read_state(self, name: str) -> BridgeState:
path = self._state_path(name)
if not path.exists():
return BridgeState.STOPPED
text = path.read_text().strip()
try:
return BridgeState(text)
except ValueError:
return BridgeState.STOPPED
def write_state(self, name: str, state: BridgeState) -> None:
self._ensure_dir()
self._state_path(name).write_text(state.value)
def read_pid(self, name: str) -> Optional[int]:
path = self._pid_path(name)
if not path.exists():
return None
try:
pid = int(path.read_text().strip())
except (ValueError, OSError):
return None
if _pid_alive(pid):
return pid
return None
def write_pid(self, name: str, pid: int) -> None:
self._ensure_dir()
self._pid_path(name).write_text(str(pid))
def clear_pid(self, name: str) -> None:
path = self._pid_path(name)
if path.exists():
path.unlink()
def is_running(self, name: str) -> bool:
return self.read_pid(name) is not None
def _pid_alive(pid: int) -> bool:
"""Return True if the process with given PID exists."""
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False

0
tests/__init__.py Normal file
View File

90
tests/test_audit.py Normal file
View File

@@ -0,0 +1,90 @@
"""Tests for audit logging."""
import json
from pathlib import Path
import pytest
from bridge.audit import AuditLogger, AuditEvent
@pytest.fixture
def log_dir(tmp_path):
return tmp_path / "bridge"
@pytest.fixture
def logger(log_dir):
return AuditLogger(state_dir=log_dir)
class TestAuditLogger:
def test_log_event_creates_file(self, logger, log_dir):
logger.log(
tunnel="my-tunnel",
event=AuditEvent.BRIDGE_STARTED,
actor="operator.bernd",
actor_class="human",
)
log_file = log_dir / "my-tunnel.log"
assert log_file.exists()
def test_log_event_is_json_line(self, logger, log_dir):
logger.log(
tunnel="my-tunnel",
event=AuditEvent.BRIDGE_STARTED,
actor="operator.bernd",
actor_class="human",
)
lines = (log_dir / "my-tunnel.log").read_text().strip().splitlines()
assert len(lines) == 1
entry = json.loads(lines[0])
assert entry["tunnel"] == "my-tunnel"
assert entry["event"] == "bridge_started"
assert entry["actor"] == "operator.bernd"
assert entry["actor_class"] == "human"
assert "timestamp" in entry
def test_multiple_events_append(self, logger, log_dir):
for event in [AuditEvent.BRIDGE_STARTED, AuditEvent.BRIDGE_CONNECTED, AuditEvent.BRIDGE_STOPPED]:
logger.log(tunnel="t", event=event, actor="a", actor_class="human")
lines = (log_dir / "t.log").read_text().strip().splitlines()
assert len(lines) == 3
def test_log_with_detail(self, logger, log_dir):
logger.log(
tunnel="t",
event=AuditEvent.HEALTH_CHECK_FAILED,
actor="a",
actor_class="automation",
detail="connection refused",
)
entry = json.loads((log_dir / "t.log").read_text().strip())
assert entry["detail"] == "connection refused"
def test_all_event_types_defined(self):
events = {e.value for e in AuditEvent}
assert "bridge_started" in events
assert "bridge_connected" in events
assert "bridge_disconnected" in events
assert "bridge_reconnecting" in events
assert "health_check_failed" in events
assert "health_check_recovered" in events
assert "bridge_stopped" in events
def test_timestamp_is_iso8601(self, logger, log_dir):
from datetime import datetime
logger.log(tunnel="t", event=AuditEvent.BRIDGE_STOPPED, actor="a", actor_class="human")
entry = json.loads((log_dir / "t.log").read_text().strip())
# Should parse without error
dt = datetime.fromisoformat(entry["timestamp"])
assert dt.tzinfo is not None or True # UTC or naive both acceptable
def test_read_events(self, logger, log_dir):
logger.log(tunnel="t", event=AuditEvent.BRIDGE_STARTED, actor="a", actor_class="human")
logger.log(tunnel="t", event=AuditEvent.BRIDGE_STOPPED, actor="a", actor_class="human")
events = logger.read_events("t")
assert len(events) == 2
assert events[0]["event"] == "bridge_started"
def test_read_events_missing_returns_empty(self, logger):
assert logger.read_events("nonexistent") == []

203
tests/test_catalog_cli.py Normal file
View File

@@ -0,0 +1,203 @@
"""Tests for catalog CLI commands (targets, catalog list/validate/show)."""
import json
import textwrap
from pathlib import Path
import pytest
from typer.testing import CliRunner
from bridge.cli import app
runner = CliRunner()
# Config with catalog_path pointing to a fixture
BASE_CONFIG = textwrap.dedent("""\
tunnels: {{}}
actors: {{}}
catalog_path: {catalog_path}
""")
CONFIG_NO_CATALOG = textwrap.dedent("""\
tunnels: {}
actors: {}
""")
@pytest.fixture
def catalog_dir(tmp_path):
root = tmp_path / "opscatalog"
domain_dir = root / "domains" / "coulombcore"
(domain_dir / "targets").mkdir(parents=True)
(domain_dir / "bridges").mkdir(parents=True)
actors_dir = root / "actors"
actors_dir.mkdir(parents=True)
(domain_dir / "domain.yaml").write_text(textwrap.dedent("""\
type: domain
id: coulombcore
name: CoulombCore Infrastructure
description: Core infra
environment: production
"""))
(domain_dir / "targets" / "state-hub.yaml").write_text(textwrap.dedent("""\
type: target
id: state-hub
domain: coulombcore
kind: service
description: State coordination service
reachable_via:
- state-hub-coulombcore
"""))
(domain_dir / "bridges" / "state-hub-coulombcore.yaml").write_text(textwrap.dedent("""\
type: bridge
id: state-hub-coulombcore
domain: coulombcore
target: state-hub
description: Ops bridge for state hub
access_method: ssh-reverse
host: coulombcore.local
remote_port: 18000
local_port: 8000
ssh_user: ubuntu
ssh_key: ~/.ssh/id_ops
actor: agent.claude-coulombcore
"""))
(actors_dir / "agents.yaml").write_text(textwrap.dedent("""\
type: actor
id: agent.claude-coulombcore
class: automation
description: Claude Code agent
"""))
return root
@pytest.fixture
def config_file(tmp_path, catalog_dir):
f = tmp_path / "tunnels.yaml"
f.write_text(BASE_CONFIG.format(catalog_path=str(catalog_dir)))
return f
@pytest.fixture
def env(config_file, tmp_path):
return {
"BRIDGE_CONFIG": str(config_file),
"BRIDGE_STATE_DIR": str(tmp_path / "state"),
}
class TestTargetsCommand:
def test_targets_shows_table(self, env):
result = runner.invoke(app, ["targets"], env=env)
assert result.exit_code == 0
assert "state-hub" in result.output
def test_targets_json(self, env):
result = runner.invoke(app, ["targets", "--json"], env=env)
assert result.exit_code == 0
data = json.loads(result.output)
assert isinstance(data, list)
assert any(t["target"] == "state-hub" for t in data)
assert any(t["domain"] == "coulombcore" for t in data)
def test_targets_domain_filter(self, env):
result = runner.invoke(app, ["targets", "--domain", "coulombcore"], env=env)
assert result.exit_code == 0
assert "state-hub" in result.output
def test_targets_domain_filter_unknown(self, env):
result = runner.invoke(app, ["targets", "--domain", "nonexistent"], env=env)
assert result.exit_code == 0
# No results but no crash
def test_targets_no_catalog_configured(self, tmp_path):
f = tmp_path / "tunnels.yaml"
f.write_text(CONFIG_NO_CATALOG)
result = runner.invoke(app, ["targets"], env={"BRIDGE_CONFIG": str(f)})
assert result.exit_code == 1
assert "catalog" in result.output.lower()
def test_targets_show_subcommand(self, env):
result = runner.invoke(app, ["targets", "show", "state-hub"], env=env)
assert result.exit_code == 0
assert "state-hub" in result.output
assert "coulombcore" in result.output
def test_targets_show_unknown(self, env):
result = runner.invoke(app, ["targets", "show", "nonexistent"], env=env)
assert result.exit_code == 1
class TestCatalogCommand:
def test_catalog_list(self, env):
result = runner.invoke(app, ["catalog", "list"], env=env)
assert result.exit_code == 0
assert "coulombcore" in result.output
def test_catalog_list_json(self, env):
result = runner.invoke(app, ["catalog", "list", "--json"], env=env)
assert result.exit_code == 0
data = json.loads(result.output)
assert isinstance(data, list)
assert any(d["domain"] == "coulombcore" for d in data)
def test_catalog_validate_clean(self, env):
result = runner.invoke(app, ["catalog", "validate"], env=env)
assert result.exit_code == 0
assert "valid" in result.output.lower() or "ok" in result.output.lower() or "0" in result.output
def test_catalog_validate_with_errors(self, tmp_path):
# Catalog with dangling reference
root = tmp_path / "bad-catalog"
domain_dir = root / "domains" / "d"
(domain_dir / "targets").mkdir(parents=True)
(domain_dir / "domain.yaml").write_text(
"type: domain\nid: d\nname: D\n"
)
(domain_dir / "targets" / "t.yaml").write_text(
"type: target\nid: t\ndomain: d\nkind: service\nreachable_via:\n - missing-bridge\n"
)
f = tmp_path / "tunnels.yaml"
f.write_text(BASE_CONFIG.format(catalog_path=str(root)))
result = runner.invoke(app, ["catalog", "validate"], env={"BRIDGE_CONFIG": str(f)})
assert result.exit_code == 1
assert "missing-bridge" in result.output
def test_catalog_show(self, env):
result = runner.invoke(app, ["catalog", "show", "state-hub-coulombcore"], env=env)
assert result.exit_code == 0
assert "state-hub-coulombcore" in result.output
assert "coulombcore.local" in result.output
def test_catalog_show_unknown(self, env):
result = runner.invoke(app, ["catalog", "show", "nonexistent"], env=env)
assert result.exit_code == 1
def test_catalog_no_catalog_configured(self, tmp_path):
f = tmp_path / "tunnels.yaml"
f.write_text(CONFIG_NO_CATALOG)
result = runner.invoke(app, ["catalog", "list"], env={"BRIDGE_CONFIG": str(f)})
assert result.exit_code == 1
class TestUpWithCatalogFallback:
def test_up_resolves_catalog_bridge(self, env):
"""bridge up <catalog-bridge-name> works when name not in inline tunnels.yaml."""
from unittest.mock import MagicMock, patch
with patch("bridge.cli.TunnelManager") as mock_mgr_cls:
mock_mgr = MagicMock()
mock_mgr.is_running.return_value = False
mock_mgr_cls.return_value = mock_mgr
result = runner.invoke(app, ["up", "state-hub-coulombcore"], env=env)
assert result.exit_code == 0
mock_mgr.start.assert_called_once()
def test_up_unknown_bridge_exit_1(self, env):
result = runner.invoke(app, ["up", "totally-nonexistent"], env=env)
assert result.exit_code == 1

View File

@@ -0,0 +1,196 @@
"""Integration tests for OpsCatalog (T14-T16 from BRIDGE-WP-0002)."""
import json
import textwrap
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from typer.testing import CliRunner
from bridge.catalog.loader import load_catalog
from bridge.catalog.resolver import BridgeNotFound, resolve
from bridge.catalog.validator import validate_catalog
from bridge.cli import app
runner = CliRunner()
@pytest.fixture
def catalog_dir(tmp_path):
root = tmp_path / "opscatalog"
domain_dir = root / "domains" / "coulombcore"
(domain_dir / "targets").mkdir(parents=True)
(domain_dir / "bridges").mkdir(parents=True)
(domain_dir / "docs").mkdir(parents=True)
actors_dir = root / "actors"
actors_dir.mkdir(parents=True)
(domain_dir / "domain.yaml").write_text(textwrap.dedent("""\
type: domain
id: coulombcore
name: CoulombCore Infrastructure
description: Core infra
environment: production
"""))
(domain_dir / "targets" / "state-hub.yaml").write_text(textwrap.dedent("""\
type: target
id: state-hub
domain: coulombcore
kind: service
description: State coordination service
reachable_via:
- state-hub-coulombcore
"""))
(domain_dir / "bridges" / "state-hub-coulombcore.yaml").write_text(textwrap.dedent("""\
type: bridge
id: state-hub-coulombcore
domain: coulombcore
target: state-hub
description: Ops bridge for state hub
access_method: ssh-reverse
host: coulombcore.local
remote_port: 18000
local_port: 8000
ssh_user: ubuntu
ssh_key: ~/.ssh/id_ops
actor: agent.claude-coulombcore
reconnect:
max_attempts: 0
backoff_initial: 5
backoff_max: 60
"""))
(actors_dir / "agents.yaml").write_text(textwrap.dedent("""\
type: actor
id: agent.claude-coulombcore
class: automation
description: Claude Code agent on CoulombCore
"""))
(domain_dir / "docs" / "overview.md").write_text(
"# CoulombCore Overview\nCore infrastructure notes."
)
return root
@pytest.fixture
def config_with_catalog(tmp_path, catalog_dir):
f = tmp_path / "tunnels.yaml"
f.write_text(textwrap.dedent(f"""\
catalog_path: {catalog_dir}
tunnels: {{}}
actors: {{}}
"""))
return f
@pytest.fixture
def env(config_with_catalog, tmp_path):
return {
"BRIDGE_CONFIG": str(config_with_catalog),
"BRIDGE_STATE_DIR": str(tmp_path / "state"),
}
class TestT14CatalogLoadAndResolve:
def test_catalog_loads_all_types(self, catalog_dir):
cat = load_catalog(catalog_dir)
assert "coulombcore" in cat.domains
assert "state-hub" in cat.targets
assert "state-hub-coulombcore" in cat.bridges
assert "agent.claude-coulombcore" in cat.actors
def test_resolve_from_catalog(self, catalog_dir):
cat = load_catalog(catalog_dir)
tc = resolve("state-hub-coulombcore", catalog=cat, inline_tunnels={})
assert tc.name == "state-hub-coulombcore"
assert tc.host == "coulombcore.local"
assert tc.remote_port == 18000
def test_bridge_up_with_catalog_bridge(self, env):
with patch("bridge.cli.TunnelManager") as mock_mgr_cls:
mock_mgr = MagicMock()
mock_mgr.is_running.return_value = False
mock_mgr_cls.return_value = mock_mgr
result = runner.invoke(app, ["up", "state-hub-coulombcore"], env=env)
assert result.exit_code == 0
mock_mgr.start.assert_called_once()
# Verify TunnelManager was constructed with correct config
call_args = mock_mgr_cls.call_args
tcfg = call_args[0][0]
assert tcfg.host == "coulombcore.local"
assert tcfg.remote_port == 18000
class TestT15BridgeTargetsOutput:
def test_targets_table(self, env):
result = runner.invoke(app, ["targets"], env=env)
assert result.exit_code == 0
assert "state-hub" in result.output
assert "coulombcore" in result.output
assert "service" in result.output
def test_targets_json_structure(self, env):
result = runner.invoke(app, ["targets", "--json"], env=env)
assert result.exit_code == 0
data = json.loads(result.output)
assert len(data) == 1
t = data[0]
assert t["target"] == "state-hub"
assert t["domain"] == "coulombcore"
assert t["kind"] == "service"
assert "state-hub-coulombcore" in t["bridges"]
def test_targets_show_includes_docs(self, env):
result = runner.invoke(app, ["targets", "show", "state-hub"], env=env)
assert result.exit_code == 0
assert "state-hub" in result.output
assert "coulombcore" in result.output
class TestT16CatalogValidate:
def test_validate_clean_catalog_exit_0(self, env):
result = runner.invoke(app, ["catalog", "validate"], env=env)
assert result.exit_code == 0
assert "ok" in result.output.lower() or "0" in result.output
def test_validate_dangling_reference_exit_1(self, tmp_path):
root = tmp_path / "bad"
domain_dir = root / "domains" / "d"
(domain_dir / "targets").mkdir(parents=True)
(domain_dir / "bridges").mkdir(parents=True)
(root / "actors").mkdir(parents=True)
(domain_dir / "domain.yaml").write_text("type: domain\nid: d\nname: D\n")
(domain_dir / "targets" / "t.yaml").write_text(
"type: target\nid: t\ndomain: d\nkind: service\n"
"reachable_via:\n - nonexistent-bridge\n"
)
(domain_dir / "bridges" / "b.yaml").write_text(
"type: bridge\nid: b\ndomain: d\ntarget: t\n"
"host: h\nremote_port: 1\nlocal_port: 2\n"
"ssh_user: u\nssh_key: k\nactor: missing-actor\n"
)
f = tmp_path / "tunnels.yaml"
f.write_text(f"catalog_path: {root}\ntunnels: {{}}\nactors: {{}}\n")
result = runner.invoke(app, ["catalog", "validate"], env={"BRIDGE_CONFIG": str(f)})
assert result.exit_code == 1
assert "nonexistent-bridge" in result.output or "missing-actor" in result.output
def test_catalog_list_shows_counts(self, env):
result = runner.invoke(app, ["catalog", "list"], env=env)
assert result.exit_code == 0
assert "coulombcore" in result.output
def test_catalog_show_bridge(self, env):
result = runner.invoke(app, ["catalog", "show", "state-hub-coulombcore"], env=env)
assert result.exit_code == 0
assert "coulombcore.local" in result.output
assert "18000" in result.output
def test_validate_using_validator_directly(self, catalog_dir):
cat = load_catalog(catalog_dir)
errors = validate_catalog(cat)
assert errors == []

View File

@@ -0,0 +1,141 @@
"""Tests for catalog loader."""
import textwrap
from pathlib import Path
import pytest
from bridge.catalog.loader import CatalogLoadError, load_catalog
from bridge.catalog.models import Catalog
@pytest.fixture
def catalog_dir(tmp_path):
"""Build a minimal valid catalog fixture."""
root = tmp_path / "opscatalog"
domain_dir = root / "domains" / "coulombcore"
(domain_dir / "targets").mkdir(parents=True)
(domain_dir / "bridges").mkdir(parents=True)
(domain_dir / "docs").mkdir(parents=True)
actors_dir = root / "actors"
actors_dir.mkdir(parents=True)
(domain_dir / "domain.yaml").write_text(textwrap.dedent("""\
type: domain
id: coulombcore
name: CoulombCore Infrastructure
description: Core infra
environment: production
"""))
(domain_dir / "targets" / "state-hub.yaml").write_text(textwrap.dedent("""\
type: target
id: state-hub
domain: coulombcore
kind: service
description: State coordination service
reachable_via:
- state-hub-coulombcore
"""))
(domain_dir / "bridges" / "state-hub-coulombcore.yaml").write_text(textwrap.dedent("""\
type: bridge
id: state-hub-coulombcore
domain: coulombcore
target: state-hub
description: Ops bridge
access_method: ssh-reverse
host: coulombcore.local
remote_port: 18000
local_port: 8000
ssh_user: ubuntu
ssh_key: ~/.ssh/id_ops
actor: agent.claude-coulombcore
health_check:
url: http://127.0.0.1:18000/health
interval_seconds: 30
timeout_seconds: 5
reconnect:
max_attempts: 0
backoff_initial: 5
backoff_max: 60
"""))
(actors_dir / "agents.yaml").write_text(textwrap.dedent("""\
type: actor
id: agent.claude-coulombcore
class: automation
description: Claude Code agent on CoulombCore
"""))
(domain_dir / "docs" / "overview.md").write_text("# Overview\nSome ops notes.")
return root
class TestLoadCatalog:
def test_loads_domain(self, catalog_dir):
cat = load_catalog(catalog_dir)
assert "coulombcore" in cat.domains
d = cat.domains["coulombcore"]
assert d.name == "CoulombCore Infrastructure"
assert d.environment == "production"
def test_loads_target(self, catalog_dir):
cat = load_catalog(catalog_dir)
assert "state-hub" in cat.targets
t = cat.targets["state-hub"]
assert t.domain == "coulombcore"
assert t.kind == "service"
assert "state-hub-coulombcore" in t.reachable_via
def test_loads_bridge(self, catalog_dir):
cat = load_catalog(catalog_dir)
assert "state-hub-coulombcore" in cat.bridges
b = cat.bridges["state-hub-coulombcore"]
assert b.host == "coulombcore.local"
assert b.remote_port == 18000
assert b.health_check is not None
assert b.health_check.url == "http://127.0.0.1:18000/health"
assert b.reconnect is not None
assert b.reconnect.max_attempts == 0
def test_loads_actor(self, catalog_dir):
cat = load_catalog(catalog_dir)
assert "agent.claude-coulombcore" in cat.actors
a = cat.actors["agent.claude-coulombcore"]
assert a.actor_class == "automation"
def test_unknown_type_skipped(self, catalog_dir):
(catalog_dir / "domains" / "coulombcore" / "unknown.yaml").write_text(
"type: mystery\nid: x\n"
)
# Should not raise
cat = load_catalog(catalog_dir)
assert isinstance(cat, Catalog)
def test_empty_catalog_dir(self, tmp_path):
root = tmp_path / "empty"
root.mkdir()
cat = load_catalog(root)
assert cat.domains == {}
assert cat.bridges == {}
def test_missing_required_field_raises(self, tmp_path):
root = tmp_path / "bad"
domain_dir = root / "domains" / "x"
domain_dir.mkdir(parents=True)
(domain_dir / "domain.yaml").write_text("type: domain\nname: X\n")
with pytest.raises(CatalogLoadError, match="id"):
load_catalog(root)
def test_nonexistent_path_raises(self, tmp_path):
with pytest.raises(CatalogLoadError, match="not found"):
load_catalog(tmp_path / "nonexistent")
def test_invalid_yaml_raises(self, tmp_path):
root = tmp_path / "bad"
domain_dir = root / "domains" / "x"
domain_dir.mkdir(parents=True)
(domain_dir / "domain.yaml").write_text("type: domain\n[\nbad: yaml")
with pytest.raises(CatalogLoadError):
load_catalog(root)

View File

@@ -0,0 +1,116 @@
"""Tests for catalog domain models."""
import pytest
from bridge.catalog.models import (
ActorClass,
Catalog,
CatalogBridge,
CatalogDomain,
CatalogTarget,
)
class TestCatalogDomain:
def test_required_fields(self):
d = CatalogDomain(id="coulombcore", name="CoulombCore Infra")
assert d.id == "coulombcore"
assert d.name == "CoulombCore Infra"
def test_optional_fields_default(self):
d = CatalogDomain(id="x", name="X")
assert d.description == ""
assert d.environment == ""
class TestCatalogTarget:
def test_required_fields(self):
t = CatalogTarget(id="state-hub", domain="coulombcore", kind="service")
assert t.id == "state-hub"
assert t.domain == "coulombcore"
assert t.kind == "service"
def test_reachable_via_defaults_empty(self):
t = CatalogTarget(id="t", domain="d", kind="service")
assert t.reachable_via == []
def test_reachable_via(self):
t = CatalogTarget(id="t", domain="d", kind="service", reachable_via=["b1", "b2"])
assert t.reachable_via == ["b1", "b2"]
class TestCatalogBridge:
def test_required_fields(self):
b = CatalogBridge(
id="state-hub-coulombcore",
domain="coulombcore",
target="state-hub",
host="coulombcore.local",
remote_port=18000,
local_port=8000,
ssh_user="ubuntu",
ssh_key="~/.ssh/id_ops",
actor="agent.claude-coulombcore",
)
assert b.id == "state-hub-coulombcore"
assert b.domain == "coulombcore"
assert b.host == "coulombcore.local"
def test_optional_fields_default(self):
b = CatalogBridge(
id="b",
domain="d",
target="t",
host="h",
remote_port=1,
local_port=2,
ssh_user="u",
ssh_key="k",
actor="a",
)
assert b.description == ""
assert b.access_method == "ssh-reverse"
assert b.health_check is None
assert b.reconnect is None
def test_to_tunnel_config(self):
from bridge.models import TunnelConfig
b = CatalogBridge(
id="state-hub-coulombcore",
domain="coulombcore",
target="state-hub",
host="coulombcore.local",
remote_port=18000,
local_port=8000,
ssh_user="ubuntu",
ssh_key="~/.ssh/id_ops",
actor="agent.claude-coulombcore",
)
tc = b.to_tunnel_config()
assert isinstance(tc, TunnelConfig)
assert tc.name == "state-hub-coulombcore"
assert tc.host == "coulombcore.local"
assert tc.remote_port == 18000
class TestActorClass:
def test_fields(self):
a = ActorClass(id="agent.claude", actor_class="automation", description="Claude agent")
assert a.id == "agent.claude"
assert a.actor_class == "automation"
def test_optional_description(self):
a = ActorClass(id="x", actor_class="human")
assert a.description == ""
class TestCatalog:
def test_empty_catalog(self):
c = Catalog()
assert c.domains == {}
assert c.targets == {}
assert c.bridges == {}
assert c.actors == {}
def test_add_entries(self):
c = Catalog()
c.domains["d"] = CatalogDomain(id="d", name="D")
assert "d" in c.domains

View File

@@ -0,0 +1,89 @@
"""Tests for catalog resolver."""
import pytest
from bridge.catalog.models import (
ActorClass,
Catalog,
CatalogBridge,
CatalogDomain,
CatalogTarget,
)
from bridge.catalog.resolver import BridgeNotFound, resolve
from bridge.models import TunnelConfig, ReconnectPolicy
@pytest.fixture
def catalog():
cat = Catalog()
cat.domains["d"] = CatalogDomain(id="d", name="D")
cat.targets["t"] = CatalogTarget(id="t", domain="d", kind="service")
cat.bridges["catalog-bridge"] = CatalogBridge(
id="catalog-bridge",
domain="d",
target="t",
host="catalog-host.local",
remote_port=19000,
local_port=9000,
ssh_user="ubuntu",
ssh_key="~/.ssh/catalog",
actor="operator.bernd",
)
cat.actors["operator.bernd"] = ActorClass(id="operator.bernd", actor_class="human")
return cat
@pytest.fixture
def inline_tunnels():
return {
"inline-bridge": TunnelConfig(
name="inline-bridge",
host="inline-host.local",
remote_port=18000,
local_port=8000,
ssh_user="ubuntu",
ssh_key="~/.ssh/inline",
actor="operator.bernd",
)
}
class TestResolve:
def test_inline_takes_precedence(self, catalog, inline_tunnels):
tc = resolve("inline-bridge", catalog=catalog, inline_tunnels=inline_tunnels)
assert tc.host == "inline-host.local"
def test_catalog_fallback(self, catalog, inline_tunnels):
tc = resolve("catalog-bridge", catalog=catalog, inline_tunnels=inline_tunnels)
assert tc.host == "catalog-host.local"
assert tc.remote_port == 19000
def test_catalog_fallback_no_inline(self, catalog):
tc = resolve("catalog-bridge", catalog=catalog, inline_tunnels={})
assert tc.name == "catalog-bridge"
def test_missing_name_raises(self, catalog, inline_tunnels):
with pytest.raises(BridgeNotFound, match="nonexistent"):
resolve("nonexistent", catalog=catalog, inline_tunnels=inline_tunnels)
def test_missing_name_no_catalog_raises(self, inline_tunnels):
with pytest.raises(BridgeNotFound):
resolve("nonexistent", catalog=None, inline_tunnels=inline_tunnels)
def test_inline_bridge_returns_tunnel_config(self, catalog, inline_tunnels):
tc = resolve("inline-bridge", catalog=catalog, inline_tunnels=inline_tunnels)
assert isinstance(tc, TunnelConfig)
def test_catalog_bridge_returns_tunnel_config(self, catalog):
tc = resolve("catalog-bridge", catalog=catalog, inline_tunnels={})
assert isinstance(tc, TunnelConfig)
def test_catalog_is_none_no_inline_raises(self):
with pytest.raises(BridgeNotFound):
resolve("any-name", catalog=None, inline_tunnels={})
def test_resolve_preserves_reconnect_policy(self, catalog):
from bridge.models import ReconnectPolicy
catalog.bridges["catalog-bridge"].reconnect = ReconnectPolicy(
max_attempts=3, backoff_initial=2, backoff_max=30
)
tc = resolve("catalog-bridge", catalog=catalog, inline_tunnels={})
assert tc.reconnect.max_attempts == 3

View File

@@ -0,0 +1,94 @@
"""Tests for catalog validator."""
import pytest
from bridge.catalog.models import (
ActorClass,
Catalog,
CatalogBridge,
CatalogDomain,
CatalogTarget,
)
from bridge.catalog.validator import ValidationError, validate_catalog
def _make_full_catalog() -> Catalog:
cat = Catalog()
cat.domains["coulombcore"] = CatalogDomain(id="coulombcore", name="CoulombCore")
cat.targets["state-hub"] = CatalogTarget(
id="state-hub",
domain="coulombcore",
kind="service",
reachable_via=["state-hub-coulombcore"],
)
cat.bridges["state-hub-coulombcore"] = CatalogBridge(
id="state-hub-coulombcore",
domain="coulombcore",
target="state-hub",
host="host.local",
remote_port=18000,
local_port=8000,
ssh_user="ubuntu",
ssh_key="~/.ssh/id_ops",
actor="agent.claude-coulombcore",
)
cat.actors["agent.claude-coulombcore"] = ActorClass(
id="agent.claude-coulombcore",
actor_class="automation",
)
return cat
class TestValidateCatalog:
def test_valid_catalog_no_errors(self):
cat = _make_full_catalog()
errors = validate_catalog(cat)
assert errors == []
def test_target_domain_must_exist(self):
cat = _make_full_catalog()
cat.targets["orphan"] = CatalogTarget(
id="orphan", domain="nonexistent-domain", kind="service"
)
errors = validate_catalog(cat)
assert any("orphan" in e and "nonexistent-domain" in e for e in errors)
def test_target_reachable_via_must_exist(self):
cat = _make_full_catalog()
cat.targets["state-hub"].reachable_via.append("nonexistent-bridge")
errors = validate_catalog(cat)
assert any("nonexistent-bridge" in e for e in errors)
def test_bridge_domain_must_exist(self):
cat = _make_full_catalog()
cat.bridges["state-hub-coulombcore"].domain = "missing-domain"
errors = validate_catalog(cat)
assert any("missing-domain" in e for e in errors)
def test_bridge_target_must_exist(self):
cat = _make_full_catalog()
cat.bridges["state-hub-coulombcore"].target = "missing-target"
errors = validate_catalog(cat)
assert any("missing-target" in e for e in errors)
def test_bridge_actor_must_exist(self):
cat = _make_full_catalog()
cat.bridges["state-hub-coulombcore"].actor = "nonexistent-actor"
errors = validate_catalog(cat)
assert any("nonexistent-actor" in e for e in errors)
def test_multiple_errors_all_reported(self):
cat = Catalog()
# Target with dangling domain and reachable_via
cat.targets["t1"] = CatalogTarget(
id="t1", domain="missing", kind="service", reachable_via=["missing-bridge"]
)
# Bridge with dangling domain + target + actor
cat.bridges["b1"] = CatalogBridge(
id="b1", domain="missing", target="missing", host="h",
remote_port=1, local_port=2, ssh_user="u", ssh_key="k", actor="missing-actor",
)
errors = validate_catalog(cat)
assert len(errors) >= 4
def test_empty_catalog_is_valid(self):
cat = Catalog()
assert validate_catalog(cat) == []

201
tests/test_cli.py Normal file
View File

@@ -0,0 +1,201 @@
"""Tests for CLI commands."""
import json
import os
import textwrap
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from typer.testing import CliRunner
from bridge.cli import app
VALID_CONFIG = textwrap.dedent("""\
tunnels:
test-tunnel:
host: host.local
remote_port: 18000
local_port: 8000
ssh_user: ubuntu
ssh_key: ~/.ssh/id_ops
actor: operator.bernd
actors:
operator.bernd:
class: human
description: Bernd
""")
runner = CliRunner()
@pytest.fixture
def config_file(tmp_path):
f = tmp_path / "tunnels.yaml"
f.write_text(VALID_CONFIG)
return f
@pytest.fixture
def state_dir(tmp_path):
return tmp_path / "state"
@pytest.fixture
def env(config_file, state_dir):
return {"BRIDGE_CONFIG": str(config_file), "BRIDGE_STATE_DIR": str(state_dir)}
class TestHelpCommand:
def test_app_help(self):
result = runner.invoke(app, ["--help"])
assert result.exit_code == 0
assert "bridge" in result.output.lower() or "Usage" in result.output
def test_up_help(self):
result = runner.invoke(app, ["up", "--help"])
assert result.exit_code == 0
def test_down_help(self):
result = runner.invoke(app, ["down", "--help"])
assert result.exit_code == 0
def test_status_help(self):
result = runner.invoke(app, ["status", "--help"])
assert result.exit_code == 0
def test_logs_help(self):
result = runner.invoke(app, ["logs", "--help"])
assert result.exit_code == 0
def test_restart_help(self):
result = runner.invoke(app, ["restart", "--help"])
assert result.exit_code == 0
class TestStatusCommand:
def test_status_shows_tunnels(self, env, state_dir):
result = runner.invoke(app, ["status"], env=env)
assert result.exit_code == 0
assert "test-tunnel" in result.output
def test_status_json_flag(self, env, state_dir):
result = runner.invoke(app, ["status", "--json"], env=env)
assert result.exit_code == 0
data = json.loads(result.output)
assert isinstance(data, list)
assert len(data) == 1
assert data[0]["tunnel"] == "test-tunnel"
assert "state" in data[0]
assert "actor" in data[0]
assert "host" in data[0]
def test_status_shows_state(self, env, state_dir):
result = runner.invoke(app, ["status"], env=env)
assert result.exit_code == 0
assert "stopped" in result.output.lower()
def test_status_unknown_config_exit_1(self, tmp_path):
result = runner.invoke(app, ["status"], env={"BRIDGE_CONFIG": str(tmp_path / "no.yaml")})
assert result.exit_code == 1
class TestUpCommand:
def test_up_unknown_tunnel_exit_1(self, env):
result = runner.invoke(app, ["up", "nonexistent"], env=env)
assert result.exit_code == 1
assert "nonexistent" in result.output
def test_up_calls_manager_start(self, env, state_dir):
with patch("bridge.cli.TunnelManager") as mock_mgr_cls:
mock_mgr = MagicMock()
mock_mgr.is_running.return_value = False
mock_mgr_cls.return_value = mock_mgr
result = runner.invoke(app, ["up", "test-tunnel"], env=env)
assert result.exit_code == 0
mock_mgr.start.assert_called_once()
def test_up_already_running_exit_2(self, env, state_dir):
with patch("bridge.cli.TunnelManager") as mock_mgr_cls:
mock_mgr = MagicMock()
mock_mgr.is_running.return_value = True
mock_mgr_cls.return_value = mock_mgr
result = runner.invoke(app, ["up", "test-tunnel"], env=env)
assert result.exit_code == 2
class TestDownCommand:
def test_down_unknown_tunnel_exit_1(self, env):
result = runner.invoke(app, ["down", "nonexistent"], env=env)
assert result.exit_code == 1
def test_down_calls_manager_stop(self, env, state_dir):
with patch("bridge.cli.TunnelManager") as mock_mgr_cls:
mock_mgr = MagicMock()
mock_mgr.is_running.return_value = True
mock_mgr_cls.return_value = mock_mgr
result = runner.invoke(app, ["down", "test-tunnel"], env=env)
assert result.exit_code == 0
mock_mgr.stop.assert_called_once()
def test_down_not_running_exit_2(self, env, state_dir):
with patch("bridge.cli.TunnelManager") as mock_mgr_cls:
mock_mgr = MagicMock()
mock_mgr.is_running.return_value = False
mock_mgr_cls.return_value = mock_mgr
result = runner.invoke(app, ["down", "test-tunnel"], env=env)
assert result.exit_code == 2
class TestLogsCommand:
def test_logs_unknown_tunnel_exit_1(self, env):
result = runner.invoke(app, ["logs", "nonexistent"], env=env)
assert result.exit_code == 1
def test_logs_no_log_file_shows_empty(self, env, state_dir):
result = runner.invoke(app, ["logs", "test-tunnel"], env=env)
assert result.exit_code == 0
def test_logs_shows_events(self, env, state_dir):
import json as _json
state_dir.mkdir(parents=True, exist_ok=True)
log_file = state_dir / "test-tunnel.log"
log_file.write_text(
_json.dumps({
"timestamp": "2026-01-01T00:00:00+00:00",
"tunnel": "test-tunnel",
"actor": "operator.bernd",
"actor_class": "human",
"event": "bridge_started",
}) + "\n"
)
result = runner.invoke(app, ["logs", "test-tunnel"], env=env)
assert result.exit_code == 0
assert "bridge_started" in result.output
class TestRestartCommand:
def test_restart_unknown_tunnel_exit_1(self, env):
result = runner.invoke(app, ["restart", "nonexistent"], env=env)
assert result.exit_code == 1
def test_restart_calls_stop_then_start(self, env):
with patch("bridge.cli.TunnelManager") as mock_mgr_cls:
mock_mgr = MagicMock()
mock_mgr_cls.return_value = mock_mgr
call_order = []
mock_mgr.stop.side_effect = lambda: call_order.append("stop")
mock_mgr.start.side_effect = lambda: call_order.append("start")
result = runner.invoke(app, ["restart", "test-tunnel"], env=env)
assert result.exit_code == 0
assert call_order == ["stop", "start"]

130
tests/test_config.py Normal file
View File

@@ -0,0 +1,130 @@
"""Tests for config loading."""
import os
import textwrap
import pytest
from bridge.config import ConfigError, load_config
VALID_YAML = textwrap.dedent("""\
tunnels:
state-hub-coulombcore:
host: coulombcore.local
remote_port: 18000
local_port: 8000
ssh_user: ubuntu
ssh_key: ~/.ssh/id_ops
actor: agent.claude-coulombcore
health_check:
url: http://127.0.0.1:18000/health
interval_seconds: 30
timeout_seconds: 5
reconnect:
max_attempts: 0
backoff_initial: 5
backoff_max: 60
actors:
agent.claude-coulombcore:
class: automation
description: Claude Code agent on CoulombCore
operator.bernd:
class: human
description: Bernd Worsch
""")
@pytest.fixture
def config_file(tmp_path):
f = tmp_path / "tunnels.yaml"
f.write_text(VALID_YAML)
return f
def test_load_valid_config(config_file, monkeypatch):
monkeypatch.setenv("BRIDGE_CONFIG", str(config_file))
cfg = load_config()
assert "state-hub-coulombcore" in cfg.tunnels
t = cfg.tunnels["state-hub-coulombcore"]
assert t.host == "coulombcore.local"
assert t.remote_port == 18000
assert t.local_port == 8000
assert t.ssh_user == "ubuntu"
assert t.actor == "agent.claude-coulombcore"
def test_health_check_loaded(config_file, monkeypatch):
monkeypatch.setenv("BRIDGE_CONFIG", str(config_file))
cfg = load_config()
t = cfg.tunnels["state-hub-coulombcore"]
assert t.health_check is not None
assert t.health_check.url == "http://127.0.0.1:18000/health"
assert t.health_check.interval_seconds == 30
def test_reconnect_policy_loaded(config_file, monkeypatch):
monkeypatch.setenv("BRIDGE_CONFIG", str(config_file))
cfg = load_config()
t = cfg.tunnels["state-hub-coulombcore"]
assert t.reconnect.max_attempts == 0
assert t.reconnect.backoff_initial == 5
assert t.reconnect.backoff_max == 60
def test_actors_loaded(config_file, monkeypatch):
monkeypatch.setenv("BRIDGE_CONFIG", str(config_file))
cfg = load_config()
assert "agent.claude-coulombcore" in cfg.actors
a = cfg.actors["agent.claude-coulombcore"]
assert a.actor_class == "automation"
assert "operator.bernd" in cfg.actors
def test_missing_required_field_raises(tmp_path, monkeypatch):
f = tmp_path / "bad.yaml"
f.write_text(textwrap.dedent("""\
tunnels:
broken:
remote_port: 18000
local_port: 8000
actors: {}
"""))
monkeypatch.setenv("BRIDGE_CONFIG", str(f))
with pytest.raises(ConfigError, match="host"):
load_config()
def test_invalid_yaml_raises(tmp_path, monkeypatch):
f = tmp_path / "bad.yaml"
f.write_text("tunnels: [\nnot: valid: yaml")
monkeypatch.setenv("BRIDGE_CONFIG", str(f))
with pytest.raises(ConfigError):
load_config()
def test_missing_config_file_raises(tmp_path, monkeypatch):
monkeypatch.setenv("BRIDGE_CONFIG", str(tmp_path / "nonexistent.yaml"))
with pytest.raises(ConfigError, match="not found"):
load_config()
def test_tunnel_without_health_check(tmp_path, monkeypatch):
f = tmp_path / "tunnels.yaml"
f.write_text(textwrap.dedent("""\
tunnels:
simple:
host: host.local
remote_port: 9000
local_port: 8000
ssh_user: ubuntu
ssh_key: ~/.ssh/id_rsa
actor: operator.bernd
actors:
operator.bernd:
class: human
description: Bernd
"""))
monkeypatch.setenv("BRIDGE_CONFIG", str(f))
cfg = load_config()
assert cfg.tunnels["simple"].health_check is None

78
tests/test_health.py Normal file
View File

@@ -0,0 +1,78 @@
"""Tests for health checking."""
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from bridge.health import HealthChecker, HealthResult
class TestHealthResult:
def test_ok(self):
r = HealthResult(ok=True, status_code=200)
assert r.ok
assert r.status_code == 200
assert r.error is None
def test_failure(self):
r = HealthResult(ok=False, error="connection refused")
assert not r.ok
assert r.error == "connection refused"
class TestHealthChecker:
@pytest.mark.asyncio
async def test_check_ok(self):
checker = HealthChecker(url="http://127.0.0.1:18000/health", timeout_seconds=5)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.get = AsyncMock(return_value=mock_response)
mock_client_cls.return_value = mock_client
result = await checker.check()
assert result.ok
assert result.status_code == 200
@pytest.mark.asyncio
async def test_check_connection_error(self):
import httpx
checker = HealthChecker(url="http://127.0.0.1:19999/health", timeout_seconds=1)
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.get = AsyncMock(side_effect=httpx.ConnectError("refused"))
mock_client_cls.return_value = mock_client
result = await checker.check()
assert not result.ok
assert result.error is not None
@pytest.mark.asyncio
async def test_check_http_error(self):
import httpx
checker = HealthChecker(url="http://127.0.0.1:18000/health", timeout_seconds=5)
mock_response = MagicMock()
mock_response.status_code = 503
mock_response.raise_for_status = MagicMock(
side_effect=httpx.HTTPStatusError("503", request=MagicMock(), response=mock_response)
)
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.get = AsyncMock(return_value=mock_response)
mock_client_cls.return_value = mock_client
result = await checker.check()
assert not result.ok
assert result.status_code == 503

219
tests/test_integration.py Normal file
View File

@@ -0,0 +1,219 @@
"""Integration tests for OpsBridge."""
import json
import os
import textwrap
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from bridge.config import load_config
from bridge.manager import TunnelManager
from bridge.models import BridgeState, ReconnectPolicy, TunnelConfig
from bridge.state import StateManager
MINIMAL_CONFIG = textwrap.dedent("""\
tunnels:
local-test:
host: 127.0.0.1
remote_port: 19000
local_port: 8000
ssh_user: testuser
ssh_key: ~/.ssh/id_rsa
actor: operator.bernd
reconnect:
max_attempts: 2
backoff_initial: 1
backoff_max: 2
actors:
operator.bernd:
class: human
description: Bernd
""")
@pytest.fixture
def config_file(tmp_path):
f = tmp_path / "tunnels.yaml"
f.write_text(MINIMAL_CONFIG)
return f
@pytest.fixture
def state_dir(tmp_path):
return tmp_path / "bridge"
@pytest.fixture
def tunnel_cfg():
return TunnelConfig(
name="local-test",
host="127.0.0.1",
remote_port=19000,
local_port=8000,
ssh_user="testuser",
ssh_key="~/.ssh/id_rsa",
actor="operator.bernd",
reconnect=ReconnectPolicy(max_attempts=2, backoff_initial=1, backoff_max=2),
)
class TestConfigRoundtrip:
def test_load_config_from_file(self, config_file, monkeypatch):
monkeypatch.setenv("BRIDGE_CONFIG", str(config_file))
cfg = load_config()
assert "local-test" in cfg.tunnels
t = cfg.tunnels["local-test"]
assert t.host == "127.0.0.1"
assert t.reconnect.max_attempts == 2
assert t.reconnect.backoff_initial == 1
class TestStateRoundtrip:
def test_state_persists_across_manager_instances(self, state_dir, tunnel_cfg):
mgr1 = TunnelManager(tunnel_cfg, state_dir=state_dir)
mgr1._state.write_state(tunnel_cfg.name, BridgeState.CONNECTED)
mgr2 = TunnelManager(tunnel_cfg, state_dir=state_dir)
assert mgr2.get_state() == BridgeState.CONNECTED
def test_stale_pid_cleanup(self, state_dir, tunnel_cfg):
sm = StateManager(state_dir=state_dir)
sm.write_pid(tunnel_cfg.name, 999999) # guaranteed not alive
sm.write_state(tunnel_cfg.name, BridgeState.CONNECTED)
# is_running should return False for dead pid
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
assert not mgr.is_running()
class TestReconnectLoop:
def test_reconnect_loop_gives_up_after_max_attempts(self, state_dir, tunnel_cfg):
"""Manager should set FAILED state after exhausting max_attempts."""
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
attempt_count = [0]
def fake_popen(cmd, **kwargs):
proc = MagicMock()
proc.poll.return_value = 1 # immediately "dead"
proc.returncode = 1
attempt_count[0] += 1
return proc
with patch("subprocess.Popen", side_effect=fake_popen), \
patch("time.sleep"): # skip sleeps for speed
mgr._run_loop()
assert attempt_count[0] >= 1
assert mgr.get_state() == BridgeState.FAILED
def test_reconnect_logs_events(self, state_dir, tunnel_cfg):
"""Audit log should contain reconnect events."""
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
def fake_popen(cmd, **kwargs):
proc = MagicMock()
proc.poll.return_value = 1
proc.returncode = 1
return proc
with patch("subprocess.Popen", side_effect=fake_popen), \
patch("time.sleep"):
mgr._run_loop()
events = mgr._audit.read_events(tunnel_cfg.name)
event_types = [e["event"] for e in events]
assert "bridge_started" in event_types or "bridge_reconnecting" in event_types or "bridge_disconnected" in event_types
class TestHealthCheckDegradedPath:
def test_degraded_state_on_health_failure(self, state_dir):
"""Health check failure sets state to DEGRADED."""
from bridge.health import HealthChecker, HealthResult
hc_cfg = MagicMock()
hc_cfg.url = "http://127.0.0.1:19001/health"
hc_cfg.interval_seconds = 0
hc_cfg.timeout_seconds = 1
tunnel_cfg = TunnelConfig(
name="hc-test",
host="127.0.0.1",
remote_port=19001,
local_port=8001,
ssh_user="u",
ssh_key="k",
actor="operator.bernd",
reconnect=ReconnectPolicy(max_attempts=1, backoff_initial=1, backoff_max=1),
health_check=hc_cfg,
)
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
proc_call_count = [0]
def fake_popen(cmd, **kwargs):
proc = MagicMock()
# First call: "alive" for 1 health check cycle then dies
proc_call_count[0] += 1
if proc_call_count[0] == 1:
# Poll returns None (alive) once then dies
poll_calls = [None, 1]
proc.poll.side_effect = poll_calls + [1] * 100
proc.returncode = 1
else:
proc.poll.return_value = 1
proc.returncode = 1
return proc
failed_result = HealthResult(ok=False, error="connection refused")
recovered_result = HealthResult(ok=True, status_code=200)
import asyncio
async def fake_check_failing():
return failed_result
with patch("subprocess.Popen", side_effect=fake_popen), \
patch("time.sleep"), \
patch("bridge.manager.HealthChecker") as mock_hc_cls:
mock_checker = MagicMock()
mock_checker.check = MagicMock(side_effect=lambda: failed_result)
# Use asyncio.run compatibility
mock_hc_cls.return_value = mock_checker
with patch("asyncio.run", side_effect=lambda coro: failed_result):
mgr._run_loop()
# Should have set degraded at some point — check audit log
events = mgr._audit.read_events("hc-test")
event_types = [e["event"] for e in events]
assert "health_check_failed" in event_types or "bridge_disconnected" in event_types
class TestAuditTrail:
def test_full_lifecycle_logged(self, state_dir, tunnel_cfg):
"""A start + immediate-exit SSH produces at minimum started + disconnected events."""
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
def fake_popen(cmd, **kwargs):
proc = MagicMock()
proc.poll.return_value = 1
proc.returncode = 1
return proc
with patch("subprocess.Popen", side_effect=fake_popen), \
patch("time.sleep"):
mgr._run_loop()
events = mgr._audit.read_events(tunnel_cfg.name)
assert len(events) >= 2
# Each event has required fields
for e in events:
assert "timestamp" in e
assert "tunnel" in e
assert "actor" in e
assert "event" in e

109
tests/test_manager.py Normal file
View File

@@ -0,0 +1,109 @@
"""Tests for TunnelManager."""
import os
import signal
import time
from pathlib import Path
from unittest.mock import MagicMock, patch, call
import pytest
from bridge.models import BridgeState, ReconnectPolicy, TunnelConfig
from bridge.manager import TunnelManager, build_ssh_command
@pytest.fixture
def tunnel_cfg():
return TunnelConfig(
name="test-tunnel",
host="host.local",
remote_port=18000,
local_port=8000,
ssh_user="ubuntu",
ssh_key="~/.ssh/id_ops",
actor="operator.bernd",
reconnect=ReconnectPolicy(max_attempts=3, backoff_initial=1, backoff_max=5),
)
@pytest.fixture
def state_dir(tmp_path):
return tmp_path / "bridge"
class TestBuildSshCommand:
def test_basic_command(self, tunnel_cfg):
cmd = build_ssh_command(tunnel_cfg)
assert cmd[0] == "ssh"
assert "-N" in cmd
assert "-R" in cmd
assert "18000:127.0.0.1:8000" in cmd
assert "-i" in cmd
assert "ubuntu@host.local" in cmd
def test_server_alive_options(self, tunnel_cfg):
cmd = build_ssh_command(tunnel_cfg)
assert "-o" in cmd
assert "ServerAliveInterval=10" in cmd
assert "ExitOnForwardFailure=yes" in cmd
def test_ssh_key_expanded(self, tunnel_cfg):
cmd = build_ssh_command(tunnel_cfg)
key_idx = cmd.index("-i") + 1
assert not cmd[key_idx].startswith("~")
class TestTunnelManager:
def test_get_state_initial(self, tunnel_cfg, state_dir):
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
assert mgr.get_state() == BridgeState.STOPPED
def test_stop_when_not_running_is_noop(self, tunnel_cfg, state_dir):
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
# Should not raise
mgr.stop()
assert mgr.get_state() == BridgeState.STOPPED
def test_stop_kills_pid(self, tunnel_cfg, state_dir):
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
# Write a fake PID of our own process to simulate running
mgr._state.write_pid(tunnel_cfg.name, os.getpid())
mgr._state.write_state(tunnel_cfg.name, BridgeState.CONNECTED)
with patch("os.kill") as mock_kill:
mgr.stop()
# Should have sent SIGTERM
mock_kill.assert_any_call(os.getpid(), signal.SIGTERM)
assert mgr.get_state() == BridgeState.STOPPED
def test_backoff_calculation(self, tunnel_cfg, state_dir):
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
# First backoff = initial
assert mgr._next_backoff(0) == 1
# Doubles each time up to max
assert mgr._next_backoff(1) == 2
assert mgr._next_backoff(2) == 4
assert mgr._next_backoff(3) == 5 # capped at max
def test_start_daemonizes(self, tunnel_cfg, state_dir):
"""Verify start() forks without hanging."""
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
# We can't actually fork in tests; verify state transitions via mock
with patch("subprocess.Popen") as mock_popen, \
patch("os.fork", return_value=1234) as mock_fork, \
patch("os.setsid"), \
patch("os._exit"):
mock_proc = MagicMock()
mock_proc.pid = 9999
mock_popen.return_value = mock_proc
# When fork returns non-zero we're the parent — just check PID written
mgr.start()
# After start the state should be STARTING (set before fork)
# and PID file should exist (written in parent branch)
def test_is_running_false_initially(self, tunnel_cfg, state_dir):
mgr = TunnelManager(tunnel_cfg, state_dir=state_dir)
assert not mgr.is_running()

75
tests/test_models.py Normal file
View File

@@ -0,0 +1,75 @@
"""Tests for domain models."""
import pytest
from bridge.models import (
ActorInfo,
BridgeState,
HealthCheckConfig,
ReconnectPolicy,
TunnelConfig,
)
class TestBridgeState:
def test_all_states_defined(self):
states = {s.value for s in BridgeState}
assert states == {"stopped", "starting", "connected", "degraded", "reconnecting", "failed"}
def test_state_is_string(self):
assert BridgeState.STOPPED == "stopped"
class TestReconnectPolicy:
def test_defaults(self):
p = ReconnectPolicy()
assert p.max_attempts == 0
assert p.backoff_initial == 5
assert p.backoff_max == 60
def test_custom(self):
p = ReconnectPolicy(max_attempts=3, backoff_initial=2, backoff_max=30)
assert p.max_attempts == 3
class TestHealthCheckConfig:
def test_required_url(self):
h = HealthCheckConfig(url="http://127.0.0.1:18000/health")
assert h.url == "http://127.0.0.1:18000/health"
assert h.interval_seconds == 30
assert h.timeout_seconds == 5
class TestTunnelConfig:
def test_minimal(self):
t = TunnelConfig(
name="test-tunnel",
host="host.local",
remote_port=18000,
local_port=8000,
ssh_user="ubuntu",
ssh_key="~/.ssh/id_ops",
actor="operator.bernd",
)
assert t.name == "test-tunnel"
assert t.health_check is None
assert isinstance(t.reconnect, ReconnectPolicy)
def test_with_health_check(self):
hc = HealthCheckConfig(url="http://127.0.0.1:18000/health")
t = TunnelConfig(
name="test",
host="h",
remote_port=1,
local_port=2,
ssh_user="u",
ssh_key="k",
actor="a",
health_check=hc,
)
assert t.health_check is hc
class TestActorInfo:
def test_fields(self):
a = ActorInfo(name="operator.bernd", actor_class="human", description="Bernd")
assert a.name == "operator.bernd"
assert a.actor_class == "human"

69
tests/test_state.py Normal file
View File

@@ -0,0 +1,69 @@
"""Tests for state management."""
import os
import signal
import pytest
from bridge.models import BridgeState
from bridge.state import StateManager
@pytest.fixture
def state_dir(tmp_path):
return tmp_path / "bridge"
@pytest.fixture
def mgr(state_dir):
return StateManager(state_dir=state_dir)
class TestStateManager:
def test_read_state_no_file_returns_stopped(self, mgr):
assert mgr.read_state("my-tunnel") == BridgeState.STOPPED
def test_write_and_read_state(self, mgr):
mgr.write_state("my-tunnel", BridgeState.CONNECTED)
assert mgr.read_state("my-tunnel") == BridgeState.CONNECTED
def test_state_roundtrip_all_values(self, mgr):
for state in BridgeState:
mgr.write_state("t", state)
assert mgr.read_state("t") == state
def test_write_pid(self, mgr):
# Write a live PID (our own process) so read_pid can confirm it's alive
pid = os.getpid()
mgr.write_pid("my-tunnel", pid)
assert mgr.read_pid("my-tunnel") == pid
def test_read_pid_no_file_returns_none(self, mgr):
assert mgr.read_pid("nonexistent") is None
def test_stale_pid_returns_none(self, mgr):
# PID 999999 almost certainly does not exist
mgr.write_pid("my-tunnel", 999999)
assert mgr.read_pid("my-tunnel") is None
def test_current_pid_is_alive(self, mgr):
mgr.write_pid("my-tunnel", os.getpid())
assert mgr.read_pid("my-tunnel") == os.getpid()
def test_clear_pid(self, mgr):
mgr.write_pid("my-tunnel", os.getpid())
mgr.clear_pid("my-tunnel")
assert mgr.read_pid("my-tunnel") is None
def test_state_dir_created_on_write(self, state_dir):
assert not state_dir.exists()
mgr = StateManager(state_dir=state_dir)
mgr.write_state("t", BridgeState.STOPPED)
assert state_dir.exists()
def test_is_running_false_when_stopped(self, mgr):
assert not mgr.is_running("my-tunnel")
def test_is_running_true_when_pid_alive(self, mgr):
mgr.write_pid("my-tunnel", os.getpid())
mgr.write_state("my-tunnel", BridgeState.CONNECTED)
assert mgr.is_running("my-tunnel")