Files
markitect-tool/src/markitect_tool/policy/adapters.py

223 lines
7.1 KiB
Python

"""Protocol boundaries for external identity and authorization engines."""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from typing import Any, Protocol
from markitect_tool.policy.models import PolicyDecision, PolicySubject
@dataclass(frozen=True)
class EnterpriseIdentity:
"""Verified enterprise identity claims normalized for policy mapping.
Concrete adapters are responsible for validating tokens or assertions before
returning this shape. Core Markitect only consumes the normalized identity.
"""
issuer: str
subject: str
identity_scheme: str = "oidc"
principal_type: str = "human"
audience: list[str] = field(default_factory=list)
authorized_party: str | None = None
preferred_username: str | None = None
roles: list[str] = field(default_factory=list)
scopes: list[str] = field(default_factory=list)
groups: list[str] = field(default_factory=list)
assurance: dict[str, Any] = field(default_factory=dict)
directory: dict[str, Any] = field(default_factory=dict)
claims: dict[str, Any] = field(default_factory=dict)
provenance: dict[str, Any] = field(default_factory=dict)
@property
def canonical_id(self) -> str:
return f"{self.identity_scheme}:{self.issuer}#{self.subject}"
def to_policy_subject(
self,
*,
allowed_labels: list[str] | None = None,
trust_zones: list[str] | None = None,
allowed_actions: list[str] | None = None,
attributes: dict[str, Any] | None = None,
) -> PolicySubject:
"""Convert verified claims into the subject shape used by gateways."""
return PolicySubject(
id=self.canonical_id,
allowed_labels=list(allowed_labels or []),
trust_zones=list(trust_zones or []),
roles=list(self.roles),
allowed_actions=list(allowed_actions or []),
attributes={
"issuer": self.issuer,
"subject": self.subject,
"identity_scheme": self.identity_scheme,
"principal_type": self.principal_type,
"audience": self.audience,
"authorized_party": self.authorized_party,
"preferred_username": self.preferred_username,
"scopes": self.scopes,
"groups": self.groups,
"assurance": self.assurance,
"directory": self.directory,
"identity_provenance": self.provenance,
}
| dict(attributes or {}),
)
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
data["canonical_id"] = self.canonical_id
return _drop_empty(data)
class IdentityClaimsAdapter(Protocol):
"""Adapter boundary for OIDC/JWT/SAML verification and normalization."""
def verify(
self,
token_or_assertion: str | dict[str, Any],
*,
context: dict[str, Any] | None = None,
) -> EnterpriseIdentity | dict[str, Any]:
"""Validate identity material and return normalized enterprise claims."""
@dataclass(frozen=True)
class DirectoryGroupResolutionRequest:
"""Request for resolving group claims that are stale, partial, or overlarge."""
subject_id: str
issuer: str
groups: list[str] = field(default_factory=list)
claims: dict[str, Any] = field(default_factory=dict)
context: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return _drop_empty(asdict(self))
@dataclass(frozen=True)
class DirectoryGroupResolution:
"""Directory group resolution result with freshness and source provenance."""
groups: list[str] = field(default_factory=list)
source: str | None = None
refreshed_at: str | None = None
overage: bool = False
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return _drop_empty(asdict(self))
class DirectoryGroupResolver(Protocol):
"""Adapter boundary for SCIM/LDAP/Graph/Keycloak group resolution."""
def resolve(
self,
request: DirectoryGroupResolutionRequest,
) -> DirectoryGroupResolution | dict[str, Any]:
"""Return groups plus freshness/provenance metadata."""
@dataclass(frozen=True)
class EnterprisePolicyMapRequest:
"""Request to map enterprise claims onto Markitect policy vocabulary."""
identity: EnterpriseIdentity | dict[str, Any]
policy_map: dict[str, Any] = field(default_factory=dict)
groups: list[str] = field(default_factory=list)
context: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
if isinstance(self.identity, EnterpriseIdentity):
data["identity"] = self.identity.to_dict()
return _drop_empty(data)
class EnterprisePolicyMapper(Protocol):
"""Adapter boundary for mapping IAM roles/groups/scopes to policy subjects."""
def map_subject(
self,
request: EnterprisePolicyMapRequest,
) -> PolicySubject | dict[str, Any]:
"""Return a gateway-ready subject with labels, zones, and actions."""
class DecisionLogStore(Protocol):
"""Persistent audit boundary for policy decisions."""
def record(
self,
decision: PolicyDecision,
*,
context: dict[str, Any] | None = None,
) -> str:
"""Persist a decision and return its durable audit id."""
def get(self, decision_id: str) -> dict[str, Any] | None:
"""Return one recorded decision when available."""
@dataclass(frozen=True)
class RelationshipPolicyRequest:
"""Relationship-based authorization request.
This maps cleanly to Zanzibar/OpenFGA/SpiceDB-style checks without binding
Markitect core to one service or tuple schema.
"""
subject: str
relation: str
object_id: str
namespace: str | None = None
context: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return _drop_empty(asdict(self))
class RelationshipPolicyAdapter(Protocol):
"""Adapter boundary for relationship authorization systems."""
def check(self, request: RelationshipPolicyRequest) -> PolicyDecision | dict[str, Any]:
"""Return a policy decision for a relationship check."""
@dataclass(frozen=True)
class RulePolicyRequest:
"""Attribute/rule policy evaluation request.
This can be mapped to OPA/Rego, Cedar, or local policy-as-data engines.
"""
subject: dict[str, Any]
action: str
object: dict[str, Any]
context: dict[str, Any] = field(default_factory=dict)
policy_id: str | None = None
def to_dict(self) -> dict[str, Any]:
return _drop_empty(asdict(self))
class RulePolicyAdapter(Protocol):
"""Adapter boundary for rule/attribute policy systems."""
def evaluate(self, request: RulePolicyRequest) -> PolicyDecision | dict[str, Any]:
"""Return a policy decision for a rule evaluation."""
def _drop_empty(data: dict[str, Any]) -> dict[str, Any]:
return {
key: value
for key, value in data.items()
if value not in (None, [], {}, "")
}