Files
kontextual-engine/src/kontextual_engine/core/audit.py

88 lines
2.7 KiB
Python

"""Audit records for material engine operations."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from .actors import OperationContext
from .policy import PolicyDecision
from .primitives import compact_dict, new_id, utc_now
class AuditOutcome(str, Enum):
SUCCESS = "success"
DENIED = "denied"
FAILED = "failed"
PARTIAL = "partial"
REVIEW_REQUIRED = "review_required"
DRY_RUN = "dry_run"
@dataclass(frozen=True)
class AuditEvent:
operation: str
target: str
outcome: AuditOutcome
actor_id: str
correlation_id: str
event_id: str = field(default_factory=lambda: new_id("audit"))
policy_decision: PolicyDecision | None = None
details: dict[str, Any] = field(default_factory=dict)
occurred_at: str = field(default_factory=lambda: utc_now().isoformat())
@classmethod
def from_context(
cls,
operation: str,
target: str,
outcome: AuditOutcome | str,
context: OperationContext,
*,
policy_decision: PolicyDecision | None = None,
details: dict[str, Any] | None = None,
event_id: str | None = None,
) -> "AuditEvent":
return cls(
event_id=event_id or new_id("audit"),
operation=operation,
target=target,
outcome=AuditOutcome(outcome),
actor_id=context.actor.id,
correlation_id=context.correlation_id,
policy_decision=policy_decision,
details=dict(details or {}),
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"event_id": self.event_id,
"operation": self.operation,
"target": self.target,
"outcome": self.outcome.value,
"actor_id": self.actor_id,
"correlation_id": self.correlation_id,
"policy_decision": self.policy_decision.to_dict() if self.policy_decision else None,
"details": dict(self.details),
"occurred_at": self.occurred_at,
}
)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AuditEvent":
return cls(
event_id=data["event_id"],
operation=data["operation"],
target=data["target"],
outcome=AuditOutcome(data["outcome"]),
actor_id=data["actor_id"],
correlation_id=data["correlation_id"],
policy_decision=PolicyDecision.from_dict(data["policy_decision"])
if data.get("policy_decision")
else None,
details=dict(data.get("details", {})),
occurred_at=data["occurred_at"],
)