Add mock file audit backend

This commit is contained in:
2026-06-01 23:44:03 +02:00
parent a3129afa42
commit afa15cd1b9
9 changed files with 353 additions and 1 deletions

24
Makefile Normal file
View File

@@ -0,0 +1,24 @@
SHELL := /usr/bin/env bash
.DEFAULT_GOAL := help
AUDIT_CORE_MOCK_DIR ?= /tmp/audit-core
test: ## Run unit tests
python3 -m pytest -q
mock-audit-smoke: ## Write one non-secret smoke event to the mock audit backend
AUDIT_CORE_MOCK_DIR="$(AUDIT_CORE_MOCK_DIR)" python3 -m audit_core emit \
--source audit-core \
--action audit_core.mock_backend.smoke \
--resource "$(AUDIT_CORE_MOCK_DIR)" \
--outcome success \
--detail backend=mock-file
mock-audit-cleanup: ## Remove mock audit files older than the retention window
AUDIT_CORE_MOCK_DIR="$(AUDIT_CORE_MOCK_DIR)" python3 -m audit_core cleanup
help: ## Show this help
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m<target>\033[0m\n"} \
/^[a-zA-Z_-]+:.*?##/ { printf " \033[36m%-24s\033[0m %s\n", $$1, $$2 }' $(MAKEFILE_LIST)
.PHONY: test mock-audit-smoke mock-audit-cleanup help

View File

@@ -1 +1,47 @@
Reliable multi-tenant auto setup audit capability
Reliable multi-tenant auto setup audit capability
## Development Mock Backend
The first implementation is intentionally tiny: a replaceable audit interface
with a mock file backend.
By default it writes JSONL audit events to:
```text
/tmp/audit-core/audit-YYYYMMDDTHH.jsonl
```
Files older than 7 days are removed when the backend writes or when cleanup is
run explicitly. This backend is for local integration and bootstrap wiring. It
is not durable audit custody.
Example:
```bash
python3 -m audit_core emit \
--source openbao \
--action openbao.authenticated_readiness_proof \
--resource openbao/openbao-0 \
--outcome success \
--detail file_audit_visible=true \
--detail backend=mock-file
```
Cleanup:
```bash
python3 -m audit_core cleanup
```
Make targets:
```bash
make test
make mock-audit-smoke
make mock-audit-cleanup
```
Environment:
- `AUDIT_CORE_MOCK_DIR`: override the output directory.
- `AUDIT_CORE_MOCK_RETENTION_DAYS`: override the default 7-day cleanup window.

6
audit_core/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
"""Audit Core public interface."""
from audit_core.interface import AuditBackend, AuditEvent
from audit_core.mock_file_backend import MockFileAuditBackend
__all__ = ["AuditBackend", "AuditEvent", "MockFileAuditBackend"]

4
audit_core/__main__.py Normal file
View File

@@ -0,0 +1,4 @@
from audit_core.cli import main
raise SystemExit(main())

82
audit_core/cli.py Normal file
View File

@@ -0,0 +1,82 @@
"""Command line helpers for the development audit backend."""
from __future__ import annotations
import argparse
import json
from typing import Any
from audit_core.interface import AuditEvent
from audit_core.mock_file_backend import MockFileAuditBackend
def parse_detail(values: list[str]) -> dict[str, Any]:
details: dict[str, Any] = {}
for item in values:
if "=" not in item:
raise SystemExit(f"detail must be key=value: {item}")
key, value = item.split("=", 1)
details[key] = value
return details
def emit(args: argparse.Namespace) -> int:
details = parse_detail(args.detail)
event = AuditEvent(
tenant=args.tenant,
scope=args.scope,
source=args.source,
actor=args.actor,
action=args.action,
resource=args.resource,
outcome=args.outcome,
reason=args.reason,
details=details,
)
backend = MockFileAuditBackend(base_dir=args.dir, retention_days=args.retention_days)
path = backend.emit(event)
print(json.dumps({"ok": True, "path": path, "event_id": event.event_id}, sort_keys=True))
return 0
def cleanup(args: argparse.Namespace) -> int:
backend = MockFileAuditBackend(base_dir=args.dir, retention_days=args.retention_days)
removed = backend.cleanup_old_files()
print(json.dumps({"ok": True, "removed": removed}, sort_keys=True))
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Audit Core development CLI")
sub = parser.add_subparsers(dest="command", required=True)
emit_parser = sub.add_parser("emit", help="Write one event to the mock file backend.")
emit_parser.add_argument("--tenant", default="platform")
emit_parser.add_argument("--scope", default="platform-control-plane")
emit_parser.add_argument("--source", required=True)
emit_parser.add_argument("--actor")
emit_parser.add_argument("--action", required=True)
emit_parser.add_argument("--resource", required=True)
emit_parser.add_argument("--outcome", default="success")
emit_parser.add_argument("--reason")
emit_parser.add_argument("--detail", action="append", default=[])
emit_parser.add_argument("--dir", default=None)
emit_parser.add_argument("--retention-days", type=int, default=None)
emit_parser.set_defaults(func=emit)
cleanup_parser = sub.add_parser("cleanup", help="Remove mock audit files older than retention.")
cleanup_parser.add_argument("--dir", default=None)
cleanup_parser.add_argument("--retention-days", type=int, default=None)
cleanup_parser.set_defaults(func=cleanup)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())

53
audit_core/interface.py Normal file
View File

@@ -0,0 +1,53 @@
"""Small audit interface shared by backends and integrations."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Protocol
from uuid import uuid4
def utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
@dataclass(frozen=True)
class AuditEvent:
"""Minimal event envelope for the first Audit Core implementation."""
source: str
action: str
resource: str
outcome: str
tenant: str = "platform"
scope: str = "platform-control-plane"
actor: str | None = None
reason: str | None = None
details: dict[str, Any] = field(default_factory=dict)
event_id: str = field(default_factory=lambda: str(uuid4()))
observed_at: str = field(default_factory=utc_now)
schema_version: str = "audit-core.event.v1alpha1"
def as_record(self) -> dict[str, Any]:
return {
"schema_version": self.schema_version,
"event_id": self.event_id,
"observed_at": self.observed_at,
"tenant": self.tenant,
"scope": self.scope,
"source": self.source,
"actor": self.actor,
"action": self.action,
"resource": self.resource,
"outcome": self.outcome,
"reason": self.reason,
"details": self.details,
}
class AuditBackend(Protocol):
"""Protocol implemented by audit sinks."""
def emit(self, event: AuditEvent) -> str:
"""Persist an event and return a backend-specific reference."""

View File

@@ -0,0 +1,65 @@
"""Development-only file backend for Audit Core.
This backend intentionally writes local JSONL files under /tmp by default. It
is useful for wiring integrations before the durable Audit Core archive exists,
but it is not production audit custody.
"""
from __future__ import annotations
import json
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from audit_core.interface import AuditEvent
class MockFileAuditBackend:
"""Append audit events to hourly JSONL files and clean up old files."""
def __init__(
self,
base_dir: str | Path | None = None,
retention_days: int | None = None,
now: datetime | None = None,
) -> None:
self.base_dir = Path(base_dir or os.environ.get("AUDIT_CORE_MOCK_DIR", "/tmp/audit-core"))
self.retention_days = int(
retention_days
if retention_days is not None
else os.environ.get("AUDIT_CORE_MOCK_RETENTION_DAYS", "7")
)
self._now = now
def emit(self, event: AuditEvent) -> str:
self.base_dir.mkdir(parents=True, exist_ok=True)
self.cleanup_old_files()
path = self.current_path()
record = event.as_record()
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(record, sort_keys=True, separators=(",", ":")) + "\n")
return str(path)
def cleanup_old_files(self) -> list[str]:
if self.retention_days < 0 or not self.base_dir.exists():
return []
cutoff = self.now() - timedelta(days=self.retention_days)
removed: list[str] = []
for path in self.base_dir.glob("audit-*.jsonl"):
try:
modified = datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc)
except FileNotFoundError:
continue
if modified < cutoff:
path.unlink(missing_ok=True)
removed.append(str(path))
return removed
def current_path(self) -> Path:
stamp = self.now().strftime("%Y%m%dT%H")
return self.base_dir / f"audit-{stamp}.jsonl"
def now(self) -> datetime:
return self._now or datetime.now(timezone.utc)

16
pyproject.toml Normal file
View File

@@ -0,0 +1,16 @@
[project]
name = "audit-core"
version = "0.1.0"
description = "Standalone audit fabric interfaces and development backends."
readme = "README.md"
requires-python = ">=3.11"
license = { file = "LICENSE" }
authors = [
{ name = "NetKingdom / Audit Core maintainers" }
]
[project.optional-dependencies]
dev = ["pytest"]
[tool.pytest.ini_options]
testpaths = ["tests"]

View File

@@ -0,0 +1,56 @@
from __future__ import annotations
import json
import os
from datetime import datetime, timedelta, timezone
from audit_core.interface import AuditEvent
from audit_core.mock_file_backend import MockFileAuditBackend
def test_emit_writes_hourly_jsonl_file(tmp_path):
now = datetime(2026, 6, 1, 20, 30, tzinfo=timezone.utc)
backend = MockFileAuditBackend(base_dir=tmp_path, now=now)
path = backend.emit(
AuditEvent(
source="openbao",
action="audit.verify",
resource="openbao/openbao-0",
outcome="success",
details={"file_audit_visible": True},
)
)
assert path.endswith("audit-20260601T20.jsonl")
lines = (tmp_path / "audit-20260601T20.jsonl").read_text(encoding="utf-8").splitlines()
assert len(lines) == 1
record = json.loads(lines[0])
assert record["source"] == "openbao"
assert record["tenant"] == "platform"
assert record["scope"] == "platform-control-plane"
assert record["details"] == {"file_audit_visible": True}
def test_cleanup_removes_files_older_than_retention(tmp_path):
now = datetime(2026, 6, 8, 20, 30, tzinfo=timezone.utc)
old_file = tmp_path / "audit-20260531T20.jsonl"
fresh_file = tmp_path / "audit-20260608T20.jsonl"
ignored_file = tmp_path / "other.jsonl"
old_file.write_text("{}\n", encoding="utf-8")
fresh_file.write_text("{}\n", encoding="utf-8")
ignored_file.write_text("{}\n", encoding="utf-8")
old_ts = (now - timedelta(days=8)).timestamp()
fresh_ts = (now - timedelta(days=1)).timestamp()
os.utime(old_file, (old_ts, old_ts))
os.utime(fresh_file, (fresh_ts, fresh_ts))
backend = MockFileAuditBackend(base_dir=tmp_path, retention_days=7, now=now)
removed = backend.cleanup_old_files()
assert str(old_file) in removed
assert not old_file.exists()
assert fresh_file.exists()
assert ignored_file.exists()