generated from coulomb/repo-seed
Adds the OpsCatalog subsystem: a Git-backed YAML catalog of operations domains, targets, bridges, and actor classes. Includes catalog loader, cross-reference validator, bridge resolver (inline-first, catalog fallback), and new CLI commands: `bridge targets`, `bridge targets show`, `bridge catalog list/validate/show`. Updates `up/down/restart` to resolve bridge names from the catalog when not defined inline. 142 tests, all green. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
70 lines
1.7 KiB
Python
70 lines
1.7 KiB
Python
"""Domain models for OpsCatalog."""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Dict, List, Optional
|
|
|
|
from bridge.models import HealthCheckConfig, ReconnectPolicy, TunnelConfig
|
|
|
|
|
|
@dataclass
|
|
class CatalogDomain:
|
|
id: str
|
|
name: str
|
|
description: str = ""
|
|
environment: str = ""
|
|
|
|
|
|
@dataclass
|
|
class CatalogTarget:
|
|
id: str
|
|
domain: str
|
|
kind: str
|
|
description: str = ""
|
|
reachable_via: List[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class CatalogBridge:
|
|
id: str
|
|
domain: str
|
|
target: str
|
|
host: str
|
|
remote_port: int
|
|
local_port: int
|
|
ssh_user: str
|
|
ssh_key: str
|
|
actor: str
|
|
description: str = ""
|
|
access_method: str = "ssh-reverse"
|
|
health_check: Optional[HealthCheckConfig] = None
|
|
reconnect: Optional[ReconnectPolicy] = None
|
|
|
|
def to_tunnel_config(self) -> TunnelConfig:
|
|
return TunnelConfig(
|
|
name=self.id,
|
|
host=self.host,
|
|
remote_port=self.remote_port,
|
|
local_port=self.local_port,
|
|
ssh_user=self.ssh_user,
|
|
ssh_key=self.ssh_key,
|
|
actor=self.actor,
|
|
reconnect=self.reconnect if self.reconnect is not None else ReconnectPolicy(),
|
|
health_check=self.health_check,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class ActorClass:
|
|
id: str
|
|
actor_class: str
|
|
description: str = ""
|
|
|
|
|
|
@dataclass
|
|
class Catalog:
|
|
domains: Dict[str, CatalogDomain] = field(default_factory=dict)
|
|
targets: Dict[str, CatalogTarget] = field(default_factory=dict)
|
|
bridges: Dict[str, CatalogBridge] = field(default_factory=dict)
|
|
actors: Dict[str, ActorClass] = field(default_factory=dict)
|