generated from coulomb/repo-seed
Implements the full BRIDGE-WP-0003 workplan: 188 tests passing, 0 lint errors. ## What's added **Capability registry** (`src/bridge/capabilities.py`): - 10 capabilities with required_access_modes (cli/mcp/skill) - Single source of truth for what OpsBridge does and where **MCP server** (`src/bridge/mcp_server/server.py`): - 10 FastMCP tools: bridge_up/down/restart/status/logs + 5 catalog_* tools - 3 resources: bridge://status, catalog://domains, catalog://targets - `.mcp.json` for project-scope auto-registration - `scripts/register_mcp.py` for user-scope machine-global registration **Skill** (`~/.claude/plugins/ops-bridge/bridge-status.md`): - /bridge-status: health table with emoji indicators + remediation advice **Cross-mode test coverage enforcement**: - `tests/conftest.py`: capability/access_mode marks + collect_capability_coverage() - `tests/test_mcp.py`: 31 FastMCP in-process client tests (Client(mcp) pattern) - `tests/test_skill.py`: static skill lint against capability registry - `tests/test_coverage_completeness.py`: meta-test that fails if any required (capability × mode) pair lacks a test; also validates CLI commands and MCP tools are registered in the capability registry **ADR** (`architecture/adr-001-cross-mode-capability-registry.md`): - Documents the registry pattern and FastMCP 3.x testing approach Key implementation note: FastMCP 3.x in-process results are in result.content[0].text (JSON string), not result.data directly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
90 lines
3.2 KiB
Python
90 lines
3.2 KiB
Python
"""Tests for audit logging."""
|
|
import json
|
|
|
|
import pytest
|
|
|
|
from bridge.audit import AuditLogger, AuditEvent
|
|
|
|
|
|
@pytest.fixture
|
|
def log_dir(tmp_path):
|
|
return tmp_path / "bridge"
|
|
|
|
|
|
@pytest.fixture
|
|
def logger(log_dir):
|
|
return AuditLogger(state_dir=log_dir)
|
|
|
|
|
|
class TestAuditLogger:
|
|
def test_log_event_creates_file(self, logger, log_dir):
|
|
logger.log(
|
|
tunnel="my-tunnel",
|
|
event=AuditEvent.BRIDGE_STARTED,
|
|
actor="operator.bernd",
|
|
actor_class="human",
|
|
)
|
|
log_file = log_dir / "my-tunnel.log"
|
|
assert log_file.exists()
|
|
|
|
def test_log_event_is_json_line(self, logger, log_dir):
|
|
logger.log(
|
|
tunnel="my-tunnel",
|
|
event=AuditEvent.BRIDGE_STARTED,
|
|
actor="operator.bernd",
|
|
actor_class="human",
|
|
)
|
|
lines = (log_dir / "my-tunnel.log").read_text().strip().splitlines()
|
|
assert len(lines) == 1
|
|
entry = json.loads(lines[0])
|
|
assert entry["tunnel"] == "my-tunnel"
|
|
assert entry["event"] == "bridge_started"
|
|
assert entry["actor"] == "operator.bernd"
|
|
assert entry["actor_class"] == "human"
|
|
assert "timestamp" in entry
|
|
|
|
def test_multiple_events_append(self, logger, log_dir):
|
|
for event in [AuditEvent.BRIDGE_STARTED, AuditEvent.BRIDGE_CONNECTED, AuditEvent.BRIDGE_STOPPED]:
|
|
logger.log(tunnel="t", event=event, actor="a", actor_class="human")
|
|
lines = (log_dir / "t.log").read_text().strip().splitlines()
|
|
assert len(lines) == 3
|
|
|
|
def test_log_with_detail(self, logger, log_dir):
|
|
logger.log(
|
|
tunnel="t",
|
|
event=AuditEvent.HEALTH_CHECK_FAILED,
|
|
actor="a",
|
|
actor_class="automation",
|
|
detail="connection refused",
|
|
)
|
|
entry = json.loads((log_dir / "t.log").read_text().strip())
|
|
assert entry["detail"] == "connection refused"
|
|
|
|
def test_all_event_types_defined(self):
|
|
events = {e.value for e in AuditEvent}
|
|
assert "bridge_started" in events
|
|
assert "bridge_connected" in events
|
|
assert "bridge_disconnected" in events
|
|
assert "bridge_reconnecting" in events
|
|
assert "health_check_failed" in events
|
|
assert "health_check_recovered" in events
|
|
assert "bridge_stopped" in events
|
|
|
|
def test_timestamp_is_iso8601(self, logger, log_dir):
|
|
from datetime import datetime
|
|
logger.log(tunnel="t", event=AuditEvent.BRIDGE_STOPPED, actor="a", actor_class="human")
|
|
entry = json.loads((log_dir / "t.log").read_text().strip())
|
|
# Should parse without error
|
|
dt = datetime.fromisoformat(entry["timestamp"])
|
|
assert dt.tzinfo is not None or True # UTC or naive both acceptable
|
|
|
|
def test_read_events(self, logger, log_dir):
|
|
logger.log(tunnel="t", event=AuditEvent.BRIDGE_STARTED, actor="a", actor_class="human")
|
|
logger.log(tunnel="t", event=AuditEvent.BRIDGE_STOPPED, actor="a", actor_class="human")
|
|
events = logger.read_events("t")
|
|
assert len(events) == 2
|
|
assert events[0]["event"] == "bridge_started"
|
|
|
|
def test_read_events_missing_returns_empty(self, logger):
|
|
assert logger.read_events("nonexistent") == []
|