Files
ops-bridge/tests/test_audit.py
Bernd Worsch a7eaf59ced feat: implement OpsBridge CLI (BRIDGE-WP-0001)
Full TDD implementation of the `bridge` CLI tool covering all phases
from BRIDGE-WP-0001: project scaffolding, config loading, state
management, audit logging, health checks, tunnel lifecycle manager, and
all CLI commands (up/down/restart/status/logs). 77 tests, all green.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 01:40:08 +00:00

91 lines
3.2 KiB
Python

"""Tests for audit logging."""
import json
from pathlib import Path
import pytest
from bridge.audit import AuditLogger, AuditEvent
@pytest.fixture
def log_dir(tmp_path):
return tmp_path / "bridge"
@pytest.fixture
def logger(log_dir):
return AuditLogger(state_dir=log_dir)
class TestAuditLogger:
def test_log_event_creates_file(self, logger, log_dir):
logger.log(
tunnel="my-tunnel",
event=AuditEvent.BRIDGE_STARTED,
actor="operator.bernd",
actor_class="human",
)
log_file = log_dir / "my-tunnel.log"
assert log_file.exists()
def test_log_event_is_json_line(self, logger, log_dir):
logger.log(
tunnel="my-tunnel",
event=AuditEvent.BRIDGE_STARTED,
actor="operator.bernd",
actor_class="human",
)
lines = (log_dir / "my-tunnel.log").read_text().strip().splitlines()
assert len(lines) == 1
entry = json.loads(lines[0])
assert entry["tunnel"] == "my-tunnel"
assert entry["event"] == "bridge_started"
assert entry["actor"] == "operator.bernd"
assert entry["actor_class"] == "human"
assert "timestamp" in entry
def test_multiple_events_append(self, logger, log_dir):
for event in [AuditEvent.BRIDGE_STARTED, AuditEvent.BRIDGE_CONNECTED, AuditEvent.BRIDGE_STOPPED]:
logger.log(tunnel="t", event=event, actor="a", actor_class="human")
lines = (log_dir / "t.log").read_text().strip().splitlines()
assert len(lines) == 3
def test_log_with_detail(self, logger, log_dir):
logger.log(
tunnel="t",
event=AuditEvent.HEALTH_CHECK_FAILED,
actor="a",
actor_class="automation",
detail="connection refused",
)
entry = json.loads((log_dir / "t.log").read_text().strip())
assert entry["detail"] == "connection refused"
def test_all_event_types_defined(self):
events = {e.value for e in AuditEvent}
assert "bridge_started" in events
assert "bridge_connected" in events
assert "bridge_disconnected" in events
assert "bridge_reconnecting" in events
assert "health_check_failed" in events
assert "health_check_recovered" in events
assert "bridge_stopped" in events
def test_timestamp_is_iso8601(self, logger, log_dir):
from datetime import datetime
logger.log(tunnel="t", event=AuditEvent.BRIDGE_STOPPED, actor="a", actor_class="human")
entry = json.loads((log_dir / "t.log").read_text().strip())
# Should parse without error
dt = datetime.fromisoformat(entry["timestamp"])
assert dt.tzinfo is not None or True # UTC or naive both acceptable
def test_read_events(self, logger, log_dir):
logger.log(tunnel="t", event=AuditEvent.BRIDGE_STARTED, actor="a", actor_class="human")
logger.log(tunnel="t", event=AuditEvent.BRIDGE_STOPPED, actor="a", actor_class="human")
events = logger.read_events("t")
assert len(events) == 2
assert events[0]["event"] == "bridge_started"
def test_read_events_missing_returns_empty(self, logger):
assert logger.read_events("nonexistent") == []