generated from coulomb/repo-seed
83 lines
2.8 KiB
Python
83 lines
2.8 KiB
Python
"""Command line helpers for the development audit backend."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from typing import Any
|
|
|
|
from audit_core.interface import AuditEvent
|
|
from audit_core.mock_file_backend import MockFileAuditBackend
|
|
|
|
|
|
def parse_detail(values: list[str]) -> dict[str, Any]:
|
|
details: dict[str, Any] = {}
|
|
for item in values:
|
|
if "=" not in item:
|
|
raise SystemExit(f"detail must be key=value: {item}")
|
|
key, value = item.split("=", 1)
|
|
details[key] = value
|
|
return details
|
|
|
|
|
|
def emit(args: argparse.Namespace) -> int:
|
|
details = parse_detail(args.detail)
|
|
event = AuditEvent(
|
|
tenant=args.tenant,
|
|
scope=args.scope,
|
|
source=args.source,
|
|
actor=args.actor,
|
|
action=args.action,
|
|
resource=args.resource,
|
|
outcome=args.outcome,
|
|
reason=args.reason,
|
|
details=details,
|
|
)
|
|
backend = MockFileAuditBackend(base_dir=args.dir, retention_days=args.retention_days)
|
|
path = backend.emit(event)
|
|
print(json.dumps({"ok": True, "path": path, "event_id": event.event_id}, sort_keys=True))
|
|
return 0
|
|
|
|
|
|
def cleanup(args: argparse.Namespace) -> int:
|
|
backend = MockFileAuditBackend(base_dir=args.dir, retention_days=args.retention_days)
|
|
removed = backend.cleanup_old_files()
|
|
print(json.dumps({"ok": True, "removed": removed}, sort_keys=True))
|
|
return 0
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Audit Core development CLI")
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
emit_parser = sub.add_parser("emit", help="Write one event to the mock file backend.")
|
|
emit_parser.add_argument("--tenant", default="platform")
|
|
emit_parser.add_argument("--scope", default="platform-control-plane")
|
|
emit_parser.add_argument("--source", required=True)
|
|
emit_parser.add_argument("--actor")
|
|
emit_parser.add_argument("--action", required=True)
|
|
emit_parser.add_argument("--resource", required=True)
|
|
emit_parser.add_argument("--outcome", default="success")
|
|
emit_parser.add_argument("--reason")
|
|
emit_parser.add_argument("--detail", action="append", default=[])
|
|
emit_parser.add_argument("--dir", default=None)
|
|
emit_parser.add_argument("--retention-days", type=int, default=None)
|
|
emit_parser.set_defaults(func=emit)
|
|
|
|
cleanup_parser = sub.add_parser("cleanup", help="Remove mock audit files older than retention.")
|
|
cleanup_parser.add_argument("--dir", default=None)
|
|
cleanup_parser.add_argument("--retention-days", type=int, default=None)
|
|
cleanup_parser.set_defaults(func=cleanup)
|
|
|
|
return parser
|
|
|
|
|
|
def main() -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
return args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|