Files
ops-bridge/src/bridge/state.py
tegwick a55c685f89 feat(diagnostics): end-to-end tunnel check, stale state detection, MCP extensions
- diagnostics.py: TunnelCheckResult with SSH process liveness, port
  probe, and optional API health check; check_tunnel / check_all_tunnels
- cli.py: bridge status shows LIVE column and [STALE] marker when state
  says connected but PID is dead; bridge check wired to diagnostics
- state.py: read_raw_pid helper; _pid_alive exported for reuse
- capabilities.py: capabilities registry stubs
- mcp_server/server.py: expose check_tunnel and tunnel capabilities
  over MCP
- SCOPE.md: rapid orientation document
- workplans/OPS-WP-0001-diagnostics.md: workplan backing this feature
- tests: 207 passing (test_cli, test_mcp, test_diagnostics)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-21 15:07:47 +01:00

84 lines
2.4 KiB
Python

"""State file management for OpsBridge."""
from __future__ import annotations
import os
from pathlib import Path
from typing import Optional
from bridge.models import BridgeState
def _default_state_dir() -> Path:
return Path.home() / ".local" / "state" / "bridge"
class StateManager:
def __init__(self, state_dir: Optional[Path] = None):
self._dir = Path(state_dir) if state_dir else _default_state_dir()
def _ensure_dir(self) -> None:
self._dir.mkdir(parents=True, exist_ok=True)
def _state_path(self, name: str) -> Path:
return self._dir / f"{name}.state"
def _pid_path(self, name: str) -> Path:
return self._dir / f"{name}.pid"
def read_state(self, name: str) -> BridgeState:
path = self._state_path(name)
if not path.exists():
return BridgeState.STOPPED
text = path.read_text().strip()
try:
return BridgeState(text)
except ValueError:
return BridgeState.STOPPED
def write_state(self, name: str, state: BridgeState) -> None:
self._ensure_dir()
self._state_path(name).write_text(state.value)
def read_pid(self, name: str) -> Optional[int]:
path = self._pid_path(name)
if not path.exists():
return None
try:
pid = int(path.read_text().strip())
except (ValueError, OSError):
return None
if _pid_alive(pid):
return pid
return None
def read_raw_pid(self, name: str) -> Optional[int]:
"""Read PID from file without liveness check. Returns None if file absent/invalid."""
path = self._pid_path(name)
if not path.exists():
return None
try:
return int(path.read_text().strip())
except (ValueError, OSError):
return None
def write_pid(self, name: str, pid: int) -> None:
self._ensure_dir()
self._pid_path(name).write_text(str(pid))
def clear_pid(self, name: str) -> None:
path = self._pid_path(name)
if path.exists():
path.unlink()
def is_running(self, name: str) -> bool:
return self.read_pid(name) is not None
def _pid_alive(pid: int) -> bool:
"""Return True if the process with given PID exists."""
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False