Files
kontextual-engine/src/kontextual_engine/core/actors.py

114 lines
3.4 KiB
Python

"""Actor and operation-context primitives."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from .primitives import compact_dict, new_id
class ActorType(str, Enum):
HUMAN = "human"
APPLICATION = "application"
AUTOMATION = "automation"
SERVICE_ACCOUNT = "service_account"
AI_AGENT = "ai_agent"
@dataclass(frozen=True)
class Actor:
id: str
actor_type: ActorType
display_name: str | None = None
external_ref: str | None = None
groups: tuple[str, ...] = ()
metadata: dict[str, Any] = field(default_factory=dict)
@classmethod
def create(
cls,
actor_type: ActorType | str,
*,
actor_id: str | None = None,
display_name: str | None = None,
external_ref: str | None = None,
groups: list[str] | tuple[str, ...] | None = None,
metadata: dict[str, Any] | None = None,
) -> "Actor":
return cls(
id=actor_id or new_id("actor"),
actor_type=ActorType(actor_type),
display_name=display_name,
external_ref=external_ref,
groups=tuple(groups or ()),
metadata=dict(metadata or {}),
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"id": self.id,
"actor_type": self.actor_type.value,
"display_name": self.display_name,
"external_ref": self.external_ref,
"groups": list(self.groups),
"metadata": dict(self.metadata),
}
)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Actor":
return cls(
id=data["id"],
actor_type=ActorType(data["actor_type"]),
display_name=data.get("display_name"),
external_ref=data.get("external_ref"),
groups=tuple(data.get("groups", [])),
metadata=dict(data.get("metadata", {})),
)
@dataclass(frozen=True)
class OperationContext:
actor: Actor
correlation_id: str
delegated_actor: Actor | None = None
request_scope: dict[str, Any] = field(default_factory=dict)
policy_scope: dict[str, Any] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
@classmethod
def create(
cls,
actor: Actor,
*,
correlation_id: str | None = None,
delegated_actor: Actor | None = None,
request_scope: dict[str, Any] | None = None,
policy_scope: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
) -> "OperationContext":
return cls(
actor=actor,
delegated_actor=delegated_actor,
correlation_id=correlation_id or new_id("corr"),
request_scope=dict(request_scope or {}),
policy_scope=dict(policy_scope or {}),
metadata=dict(metadata or {}),
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"actor": self.actor.to_dict(),
"delegated_actor": self.delegated_actor.to_dict() if self.delegated_actor else None,
"correlation_id": self.correlation_id,
"request_scope": dict(self.request_scope),
"policy_scope": dict(self.policy_scope),
"metadata": dict(self.metadata),
}
)