from __future__ import annotations import json import os from datetime import datetime, timedelta, timezone from audit_core.interface import AuditEvent from audit_core.mock_file_backend import MockFileAuditBackend def test_emit_writes_hourly_jsonl_file(tmp_path): now = datetime(2026, 6, 1, 20, 30, tzinfo=timezone.utc) backend = MockFileAuditBackend(base_dir=tmp_path, now=now) path = backend.emit( AuditEvent( source="openbao", action="audit.verify", resource="openbao/openbao-0", outcome="success", details={"file_audit_visible": True}, ) ) assert path.endswith("audit-20260601T20.jsonl") lines = (tmp_path / "audit-20260601T20.jsonl").read_text(encoding="utf-8").splitlines() assert len(lines) == 1 record = json.loads(lines[0]) assert record["source"] == "openbao" assert record["tenant"] == "platform" assert record["scope"] == "platform-control-plane" assert record["details"] == {"file_audit_visible": True} def test_cleanup_removes_files_older_than_retention(tmp_path): now = datetime(2026, 6, 8, 20, 30, tzinfo=timezone.utc) old_file = tmp_path / "audit-20260531T20.jsonl" fresh_file = tmp_path / "audit-20260608T20.jsonl" ignored_file = tmp_path / "other.jsonl" old_file.write_text("{}\n", encoding="utf-8") fresh_file.write_text("{}\n", encoding="utf-8") ignored_file.write_text("{}\n", encoding="utf-8") old_ts = (now - timedelta(days=8)).timestamp() fresh_ts = (now - timedelta(days=1)).timestamp() os.utime(old_file, (old_ts, old_ts)) os.utime(fresh_file, (fresh_ts, fresh_ts)) backend = MockFileAuditBackend(base_dir=tmp_path, retention_days=7, now=now) removed = backend.cleanup_old_files() assert str(old_file) in removed assert not old_file.exists() assert fresh_file.exists() assert ignored_file.exists()