generated from coulomb/repo-seed
- ActorType enum (adm/agt/atm) replaces actor_class string; config validates naming convention (adm-*/agt-*/atm-*) with hard ConfigError on mismatch; legacy 'human'/'automation' values accepted with DeprecationWarning - cert_command: pluggable shell string run before each SSH launch; cert written to state dir; -i cert appended to SSH command alongside -i key - TTL-aware cert refresh: parses Valid-to via ssh-keygen -L; pre-emptive restart 5 min before expiry (no backoff, no attempt increment); CERT_EXPIRING logged - CertAcquisitionError: cert failures trigger normal backoff/retry loop - cert_identity: Key ID parsed from cert and recorded in BRIDGE_CONNECTED event - bridge cert-status: new CLI command; exit 1 on expired cert; --json flag - 233 tests passing, ruff clean Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
623 lines
23 KiB
Python
623 lines
23 KiB
Python
"""Tests for OpsBridge MCP server tools (FastMCP in-process client).
|
|
|
|
Uses FastMCP's Client(mcp_app) context manager — no network, no subprocess.
|
|
All tests are async; asyncio_mode = "auto" in pyproject.toml.
|
|
|
|
FastMCP 3.x returns results in result.content[0].text as a JSON string.
|
|
Use _data(result) to extract and parse.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import textwrap
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from bridge.mcp_server.server import mcp
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _data(result) -> list | dict:
|
|
"""Extract and parse JSON from a FastMCP CallToolResult.
|
|
|
|
FastMCP 3.x: non-empty results are in result.content[0].text.
|
|
Empty list/dict returns come back with empty content; result.data holds them.
|
|
"""
|
|
if not result.content:
|
|
return result.data # empty list/dict
|
|
text = result.content[0].text
|
|
return json.loads(text)
|
|
|
|
|
|
def _write_config(tmp_path: Path, content: str) -> Path:
|
|
f = tmp_path / "tunnels.yaml"
|
|
f.write_text(content)
|
|
return f
|
|
|
|
|
|
def _simple_config(tmp_path: Path) -> Path:
|
|
return _write_config(tmp_path, textwrap.dedent("""\
|
|
tunnels:
|
|
test-tunnel:
|
|
host: host.local
|
|
remote_port: 18000
|
|
local_port: 8000
|
|
ssh_user: ubuntu
|
|
ssh_key: ~/.ssh/id_ops
|
|
actor: adm-bernd
|
|
actors:
|
|
adm-bernd:
|
|
class: adm
|
|
description: Bernd
|
|
"""))
|
|
|
|
|
|
def _catalog_config(tmp_path: Path, catalog_dir: Path) -> Path:
|
|
return _write_config(tmp_path, textwrap.dedent(f"""\
|
|
tunnels:
|
|
test-tunnel:
|
|
host: host.local
|
|
remote_port: 18000
|
|
local_port: 8000
|
|
ssh_user: ubuntu
|
|
ssh_key: ~/.ssh/id_ops
|
|
actor: adm-bernd
|
|
actors:
|
|
adm-bernd:
|
|
class: adm
|
|
description: Bernd
|
|
catalog_path: {catalog_dir}
|
|
"""))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture
|
|
def env_simple(tmp_path, monkeypatch):
|
|
cfg = _simple_config(tmp_path)
|
|
monkeypatch.setenv("BRIDGE_CONFIG", str(cfg))
|
|
monkeypatch.setenv("BRIDGE_STATE_DIR", str(tmp_path / "state"))
|
|
|
|
|
|
@pytest.fixture
|
|
def env_catalog(tmp_path, catalog_dir, monkeypatch):
|
|
cfg = _catalog_config(tmp_path, catalog_dir)
|
|
monkeypatch.setenv("BRIDGE_CONFIG", str(cfg))
|
|
monkeypatch.setenv("BRIDGE_STATE_DIR", str(tmp_path / "state"))
|
|
|
|
|
|
@pytest.fixture
|
|
def env_no_catalog(tmp_path, monkeypatch):
|
|
cfg = _simple_config(tmp_path)
|
|
monkeypatch.setenv("BRIDGE_CONFIG", str(cfg))
|
|
monkeypatch.setenv("BRIDGE_STATE_DIR", str(tmp_path / "state"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# bridge_status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpBridgeStatus:
|
|
@pytest.mark.capability("bridge_status")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_bridge_status_returns_list(self, env_simple):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_status", {})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
row = data[0]
|
|
assert row["tunnel"] == "test-tunnel"
|
|
assert "state" in row
|
|
assert "actor" in row
|
|
assert "host" in row
|
|
|
|
async def test_bridge_status_bad_config(self, tmp_path, monkeypatch):
|
|
monkeypatch.setenv("BRIDGE_CONFIG", str(tmp_path / "nonexistent.yaml"))
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_status", {})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert "error" in data[0]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# bridge_up
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpBridgeUp:
|
|
@pytest.mark.capability("bridge_up")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_bridge_up_starts_tunnel(self, env_simple):
|
|
with patch("bridge.manager.TunnelManager") as mock_cls:
|
|
mock_mgr = MagicMock()
|
|
mock_mgr.is_running.return_value = False
|
|
mock_cls.return_value = mock_mgr
|
|
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_up", {"tunnel": "test-tunnel"})
|
|
|
|
data = _data(result)
|
|
assert "started" in data
|
|
assert "test-tunnel" in data["started"]
|
|
|
|
async def test_bridge_up_already_running(self, env_simple):
|
|
with patch("bridge.manager.TunnelManager") as mock_cls:
|
|
mock_mgr = MagicMock()
|
|
mock_mgr.is_running.return_value = True
|
|
mock_cls.return_value = mock_mgr
|
|
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_up", {"tunnel": "test-tunnel"})
|
|
|
|
data = _data(result)
|
|
assert "already_running" in data
|
|
assert "test-tunnel" in data["already_running"]
|
|
|
|
async def test_bridge_up_unknown_tunnel(self, env_simple):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_up", {"tunnel": "nonexistent"})
|
|
data = _data(result)
|
|
assert "error" in data
|
|
|
|
async def test_bridge_up_all_tunnels(self, env_simple):
|
|
with patch("bridge.manager.TunnelManager") as mock_cls:
|
|
mock_mgr = MagicMock()
|
|
mock_mgr.is_running.return_value = False
|
|
mock_cls.return_value = mock_mgr
|
|
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_up", {})
|
|
|
|
data = _data(result)
|
|
assert "started" in data
|
|
assert "test-tunnel" in data["started"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# bridge_down
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpBridgeDown:
|
|
@pytest.mark.capability("bridge_down")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_bridge_down_stops_tunnel(self, env_simple):
|
|
with patch("bridge.manager.TunnelManager") as mock_cls:
|
|
mock_mgr = MagicMock()
|
|
mock_mgr.is_running.return_value = True
|
|
mock_cls.return_value = mock_mgr
|
|
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_down", {"tunnel": "test-tunnel"})
|
|
|
|
data = _data(result)
|
|
assert "stopped" in data
|
|
assert "test-tunnel" in data["stopped"]
|
|
|
|
async def test_bridge_down_not_running(self, env_simple):
|
|
with patch("bridge.manager.TunnelManager") as mock_cls:
|
|
mock_mgr = MagicMock()
|
|
mock_mgr.is_running.return_value = False
|
|
mock_cls.return_value = mock_mgr
|
|
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_down", {"tunnel": "test-tunnel"})
|
|
|
|
data = _data(result)
|
|
assert "not_running" in data
|
|
assert "test-tunnel" in data["not_running"]
|
|
|
|
async def test_bridge_down_unknown_tunnel(self, env_simple):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_down", {"tunnel": "nonexistent"})
|
|
data = _data(result)
|
|
assert "error" in data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# bridge_restart
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpBridgeRestart:
|
|
@pytest.mark.capability("bridge_restart")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_bridge_restart_calls_stop_then_start(self, env_simple):
|
|
with patch("bridge.manager.TunnelManager") as mock_cls:
|
|
mock_mgr = MagicMock()
|
|
call_order = []
|
|
mock_mgr.stop.side_effect = lambda: call_order.append("stop")
|
|
mock_mgr.start.side_effect = lambda: call_order.append("start")
|
|
mock_cls.return_value = mock_mgr
|
|
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_restart", {"tunnel": "test-tunnel"})
|
|
|
|
data = _data(result)
|
|
assert "restarted" in data
|
|
assert "test-tunnel" in data["restarted"]
|
|
assert call_order == ["stop", "start"]
|
|
|
|
async def test_bridge_restart_unknown_tunnel(self, env_simple):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_restart", {"tunnel": "nonexistent"})
|
|
data = _data(result)
|
|
assert "error" in data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# bridge_logs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpBridgeLogs:
|
|
@pytest.mark.capability("bridge_logs")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_bridge_logs_returns_list(self, env_simple, tmp_path):
|
|
import json as _json
|
|
state_dir = tmp_path / "state"
|
|
state_dir.mkdir(parents=True, exist_ok=True)
|
|
log_file = state_dir / "test-tunnel.log"
|
|
log_file.write_text(
|
|
_json.dumps({
|
|
"timestamp": "2026-01-01T00:00:00+00:00",
|
|
"tunnel": "test-tunnel",
|
|
"actor": "adm-bernd",
|
|
"actor_type": "adm",
|
|
"event": "bridge_started",
|
|
}) + "\n"
|
|
)
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_logs", {"tunnel": "test-tunnel"})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
assert data[0]["event"] == "bridge_started"
|
|
|
|
async def test_bridge_logs_unknown_tunnel(self, env_simple):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_logs", {"tunnel": "nonexistent"})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert "error" in data[0]
|
|
|
|
async def test_bridge_logs_empty(self, env_simple):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_logs", {"tunnel": "test-tunnel"})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert data == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# catalog_list_targets
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpCatalogListTargets:
|
|
@pytest.mark.capability("catalog_list_targets")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_catalog_list_targets_returns_list(self, env_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_list_targets", {})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert any(t["id"] == "state-hub" for t in data)
|
|
|
|
async def test_catalog_list_targets_domain_filter(self, env_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_list_targets", {"domain": "coulombcore"})
|
|
data = _data(result)
|
|
assert all(t["domain"] == "coulombcore" for t in data)
|
|
|
|
async def test_catalog_list_targets_no_catalog(self, env_no_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_list_targets", {})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert "error" in data[0]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# catalog_show_target
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpCatalogShowTarget:
|
|
@pytest.mark.capability("catalog_show_target")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_catalog_show_target_returns_metadata(self, env_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_show_target", {"target_id": "state-hub"})
|
|
data = _data(result)
|
|
assert data["id"] == "state-hub"
|
|
assert data["domain"] == "coulombcore"
|
|
|
|
async def test_catalog_show_target_not_found(self, env_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_show_target", {"target_id": "nonexistent"})
|
|
data = _data(result)
|
|
assert "error" in data
|
|
|
|
async def test_catalog_show_target_no_catalog(self, env_no_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_show_target", {"target_id": "x"})
|
|
data = _data(result)
|
|
assert "error" in data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# catalog_list_domains
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpCatalogListDomains:
|
|
@pytest.mark.capability("catalog_list_domains")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_catalog_list_domains_returns_list(self, env_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_list_domains", {})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert any(d["id"] == "coulombcore" for d in data)
|
|
|
|
async def test_catalog_list_domains_no_catalog(self, env_no_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_list_domains", {})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert "error" in data[0]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# catalog_validate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpCatalogValidate:
|
|
@pytest.mark.capability("catalog_validate")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_catalog_validate_clean(self, env_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_validate", {})
|
|
data = _data(result)
|
|
assert data["valid"] is True
|
|
|
|
async def test_catalog_validate_no_catalog(self, env_no_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_validate", {})
|
|
data = _data(result)
|
|
assert data["valid"] is False
|
|
assert len(data["errors"]) > 0
|
|
|
|
async def test_catalog_validate_with_errors(self, tmp_path, monkeypatch):
|
|
root = tmp_path / "bad-catalog"
|
|
domain_dir = root / "domains" / "d"
|
|
(domain_dir / "targets").mkdir(parents=True)
|
|
(domain_dir / "domain.yaml").write_text("type: domain\nid: d\nname: D\n")
|
|
(domain_dir / "targets" / "t.yaml").write_text(
|
|
"type: target\nid: t\ndomain: d\nkind: service\n"
|
|
"reachable_via:\n - missing-bridge\n"
|
|
)
|
|
cfg = tmp_path / "tunnels.yaml"
|
|
cfg.write_text(f"tunnels: {{}}\nactors: {{}}\ncatalog_path: {root}\n")
|
|
monkeypatch.setenv("BRIDGE_CONFIG", str(cfg))
|
|
monkeypatch.setenv("BRIDGE_STATE_DIR", str(tmp_path / "state"))
|
|
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_validate", {})
|
|
data = _data(result)
|
|
assert data["valid"] is False
|
|
assert any("missing-bridge" in e for e in data["errors"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# catalog_show_bridge
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpCatalogShowBridge:
|
|
@pytest.mark.capability("catalog_show_bridge")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_catalog_show_bridge_returns_metadata(self, env_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool(
|
|
"catalog_show_bridge", {"bridge_id": "state-hub-coulombcore"}
|
|
)
|
|
data = _data(result)
|
|
assert data["id"] == "state-hub-coulombcore"
|
|
assert data["host"] == "coulombcore.local"
|
|
|
|
async def test_catalog_show_bridge_not_found(self, env_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_show_bridge", {"bridge_id": "nonexistent"})
|
|
data = _data(result)
|
|
assert "error" in data
|
|
|
|
async def test_catalog_show_bridge_no_catalog(self, env_no_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("catalog_show_bridge", {"bridge_id": "x"})
|
|
data = _data(result)
|
|
assert "error" in data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# bridge_check
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpBridgeCheck:
|
|
@pytest.mark.capability("bridge_check")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_bridge_check_tool(self, env_simple):
|
|
"""bridge_check returns a list of dicts with 'ok' key."""
|
|
from bridge.diagnostics import TunnelCheckResult
|
|
mock_result = TunnelCheckResult(
|
|
tunnel="test-tunnel",
|
|
ssh_process="ok",
|
|
pid=12345,
|
|
remote_port="listening",
|
|
local_api=None,
|
|
latency_ms=None,
|
|
stale_state=False,
|
|
)
|
|
with patch("bridge.mcp_server.server.check_all_tunnels", return_value=[mock_result]):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_check", {})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
row = data[0]
|
|
assert "ok" in row
|
|
assert row["ok"] is True
|
|
assert row["tunnel"] == "test-tunnel"
|
|
assert row["ssh_process"] == "ok"
|
|
assert row["remote_port"] == "listening"
|
|
|
|
async def test_bridge_check_specific_tunnel(self, env_simple):
|
|
"""bridge_check with tunnel arg calls check_tunnel for that tunnel."""
|
|
from bridge.diagnostics import TunnelCheckResult
|
|
mock_result = TunnelCheckResult(
|
|
tunnel="test-tunnel",
|
|
ssh_process="dead",
|
|
pid=None,
|
|
remote_port="closed",
|
|
local_api=None,
|
|
latency_ms=None,
|
|
stale_state=True,
|
|
)
|
|
with patch("bridge.mcp_server.server.check_tunnel", return_value=mock_result):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_check", {"tunnel": "test-tunnel"})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert data[0]["ok"] is False
|
|
assert data[0]["stale_state"] is True
|
|
|
|
async def test_bridge_check_unknown_tunnel(self, env_simple):
|
|
"""bridge_check with unknown tunnel returns error dict."""
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_check", {"tunnel": "nonexistent"})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert "error" in data[0]
|
|
|
|
async def test_bridge_check_bad_config(self, tmp_path, monkeypatch):
|
|
"""bridge_check with bad config returns error dict."""
|
|
monkeypatch.setenv("BRIDGE_CONFIG", str(tmp_path / "nonexistent.yaml"))
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_check", {})
|
|
data = _data(result)
|
|
assert isinstance(data, list)
|
|
assert "error" in data[0]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Resources
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpResources:
|
|
async def test_bridge_status_resource(self, env_simple):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.read_resource("bridge://status")
|
|
content = result[0].text if hasattr(result[0], "text") else str(result[0])
|
|
data = json.loads(content)
|
|
assert isinstance(data, list)
|
|
|
|
async def test_catalog_domains_resource(self, env_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.read_resource("catalog://domains")
|
|
content = result[0].text if hasattr(result[0], "text") else str(result[0])
|
|
data = json.loads(content)
|
|
assert isinstance(data, list)
|
|
|
|
async def test_catalog_targets_resource(self, env_catalog):
|
|
from fastmcp import Client
|
|
async with Client(mcp) as c:
|
|
result = await c.read_resource("catalog://targets")
|
|
content = result[0].text if hasattr(result[0], "text") else str(result[0])
|
|
data = json.loads(content)
|
|
assert isinstance(data, list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# T15 — Agent workflow integration test: bridge_status → bridge_up → bridge_status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMcpAgentWorkflow:
|
|
"""T15: Verify the MCP layer supports an agent's typical tunnel management workflow."""
|
|
|
|
@pytest.mark.capability("bridge_up")
|
|
@pytest.mark.access_mode("mcp")
|
|
async def test_agent_status_up_status_workflow(self, env_simple, tmp_path):
|
|
"""Agent workflow: check status (stopped) → start tunnel → verify started."""
|
|
from fastmcp import Client
|
|
from bridge.models import BridgeState
|
|
|
|
state_dir = tmp_path / "state"
|
|
|
|
# Step 1: bridge_status → all stopped
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_status", {})
|
|
rows = _data(result)
|
|
assert rows[0]["state"] == BridgeState.STOPPED.value
|
|
|
|
# Step 2: bridge_up — mock TunnelManager to capture the call and write state
|
|
def mock_start_writes_state():
|
|
sd = state_dir
|
|
sd.mkdir(parents=True, exist_ok=True)
|
|
(sd / "test-tunnel.state").write_text(BridgeState.CONNECTED.value)
|
|
(sd / "test-tunnel.pid").write_text("12345")
|
|
|
|
with patch("bridge.manager.TunnelManager") as mock_cls:
|
|
mock_mgr = MagicMock()
|
|
mock_mgr.is_running.return_value = False
|
|
mock_mgr.start.side_effect = mock_start_writes_state
|
|
mock_cls.return_value = mock_mgr
|
|
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_up", {"tunnel": "test-tunnel"})
|
|
|
|
up_data = _data(result)
|
|
assert "test-tunnel" in up_data["started"]
|
|
|
|
# Step 3: bridge_status → reflects connected state
|
|
async with Client(mcp) as c:
|
|
result = await c.call_tool("bridge_status", {})
|
|
rows = _data(result)
|
|
assert rows[0]["tunnel"] == "test-tunnel"
|
|
assert rows[0]["state"] == BridgeState.CONNECTED.value
|