Implement isolated user-engine MVP

This commit is contained in:
2026-05-22 21:20:19 +02:00
parent b77324d7b1
commit 5203d9f45f
13 changed files with 1196 additions and 13 deletions

View File

@@ -1,5 +1,7 @@
"""Headless user-domain and profile engine."""
__all__ = ["__version__"]
from user_engine.service import UserEngineService
__all__ = ["UserEngineService", "__version__"]
__version__ = "0.0.0"

View File

@@ -0,0 +1,11 @@
"""Local adapters for standalone user-engine setups."""
from user_engine.adapters.local import (
InMemoryUserEngineStore,
LocalAuthorizationCheckPort,
)
__all__ = [
"InMemoryUserEngineStore",
"LocalAuthorizationCheckPort",
]

View File

@@ -0,0 +1,123 @@
"""Standalone adapters for isolated user-engine deployments and tests."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable
from user_engine.domain import (
Account,
Application,
ApplicationBinding,
AuditRecord,
AuthorizationDecision,
AuthorizationEffect,
AuthorizationRequest,
Catalog,
ExternalIdentity,
OutboxEvent,
ProfileScope,
ProfileValue,
User,
)
SCHEMA_VERSION = "0001_initial"
@dataclass
class InMemoryUserEngineStore:
"""Small local persistence adapter with explicit schema state.
The store is intentionally simple and deterministic. It gives the MVP a
concrete persistence boundary without committing later implementation work
to a specific database driver.
"""
schema_version: str | None = None
users: dict[str, User] = field(default_factory=dict)
accounts: dict[str, Account] = field(default_factory=dict)
identities: dict[tuple[str, str], ExternalIdentity] = field(default_factory=dict)
applications: dict[str, Application] = field(default_factory=dict)
bindings: dict[str, ApplicationBinding] = field(default_factory=dict)
catalogs: dict[str, Catalog] = field(default_factory=dict)
profile_values: dict[
tuple[str, str, ProfileScope, str | None], ProfileValue
] = field(default_factory=dict)
audit_records: list[AuditRecord] = field(default_factory=list)
outbox_events: list[OutboxEvent] = field(default_factory=list)
def migrate(self) -> None:
"""Apply the standalone schema migration manifest."""
self.schema_version = SCHEMA_VERSION
@property
def ready(self) -> bool:
return self.schema_version == SCHEMA_VERSION
def save_user(self, user: User) -> None:
self.users[user.user_id] = user
def save_account(self, account: Account) -> None:
self.accounts[account.user_id] = account
def save_identity(self, identity: ExternalIdentity) -> None:
self.identities[identity.identity_key] = identity
def save_application(self, application: Application) -> None:
self.applications[application.application_id] = application
def save_binding(self, binding: ApplicationBinding) -> None:
self.bindings[binding.application_id] = binding
def save_catalog(self, catalog: Catalog) -> None:
self.catalogs[catalog.catalog_id] = catalog
def save_profile_value(self, value: ProfileValue) -> None:
self.profile_values[
(value.user_id, value.attribute_key, value.scope, value.scope_id)
] = value
def find_identity(self, issuer: str, subject: str) -> ExternalIdentity | None:
return self.identities.get((issuer, subject))
def user_account(self, user_id: str) -> Account | None:
return self.accounts.get(user_id)
def values_for_user(self, user_id: str) -> tuple[ProfileValue, ...]:
return tuple(
value for value in self.profile_values.values() if value.user_id == user_id
)
def append_audit(self, record: AuditRecord) -> None:
self.audit_records.append(record)
def append_outbox(self, event: OutboxEvent) -> None:
self.outbox_events.append(event)
class LocalAuthorizationCheckPort:
"""Deterministic local authorization adapter.
Rules are action-specific. The default is allow so isolated tests and local
demos can focus on user-engine behavior while still exercising the port.
"""
def __init__(
self,
*,
default_effect: AuthorizationEffect = AuthorizationEffect.ALLOW,
action_effects: dict[str, AuthorizationEffect] | None = None,
) -> None:
self.default_effect = default_effect
self.action_effects = action_effects or {}
self.requests: list[AuthorizationRequest] = []
def check(self, request: AuthorizationRequest) -> AuthorizationDecision:
self.requests.append(request)
effect = self.action_effects.get(request.action, self.default_effect)
return AuthorizationDecision(effect=effect, reason="local")
def batch_check(
self, requests: Iterable[AuthorizationRequest]
) -> tuple[AuthorizationDecision, ...]:
return tuple(self.check(request) for request in requests)

View File

@@ -14,6 +14,7 @@ from user_engine.domain.models import (
Catalog,
CatalogLifecycle,
ExternalIdentity,
ManagementMode,
Membership,
Mutability,
OutboxEvent,
@@ -24,6 +25,8 @@ from user_engine.domain.models import (
Sensitivity,
User,
Visibility,
new_id,
utc_now,
)
__all__ = [
@@ -40,6 +43,7 @@ __all__ = [
"Catalog",
"CatalogLifecycle",
"ExternalIdentity",
"ManagementMode",
"Membership",
"Mutability",
"OutboxEvent",
@@ -50,4 +54,6 @@ __all__ = [
"Sensitivity",
"User",
"Visibility",
"new_id",
"utc_now",
]

23
src/user_engine/errors.py Normal file
View File

@@ -0,0 +1,23 @@
"""Typed exceptions raised by user-engine services."""
from __future__ import annotations
class UserEngineError(Exception):
"""Base class for user-engine failures."""
class AuthorizationDenied(UserEngineError):
"""Raised when the authorization port denies an operation."""
class ConflictError(UserEngineError):
"""Raised when a write would violate a uniqueness constraint."""
class NotFoundError(UserEngineError):
"""Raised when a requested domain object does not exist."""
class ValidationError(UserEngineError):
"""Raised when input fails domain validation."""

750
src/user_engine/service.py Normal file
View File

@@ -0,0 +1,750 @@
"""Headless application service for the isolated user-engine MVP."""
from __future__ import annotations
from dataclasses import dataclass, replace
from typing import Any, Mapping
from user_engine.adapters.local import InMemoryUserEngineStore
from user_engine.domain import (
Account,
AccountStatus,
Actor,
Application,
ApplicationBinding,
AttributeDefinition,
AuditRecord,
AuthorizationRequest,
Catalog,
CatalogLifecycle,
ExternalIdentity,
Mutability,
OutboxEvent,
ProfileScope,
ProfileValue,
ProjectionType,
Sensitivity,
User,
Visibility,
new_id,
)
from user_engine.errors import (
AuthorizationDenied,
ConflictError,
NotFoundError,
ValidationError,
)
from user_engine.ports import AuthorizationCheckPort, IdentityClaimsAdapter
REDACTED = "<redacted>"
@dataclass(frozen=True)
class HealthReport:
status: str
schema_version: str | None
@dataclass(frozen=True)
class ReadinessReport:
ready: bool
checks: Mapping[str, bool]
@dataclass(frozen=True)
class UserSession:
actor: Actor
user: User
account: Account
identities: tuple[ExternalIdentity, ...]
@dataclass(frozen=True)
class EffectiveProfile:
user_id: str
application_id: str | None
values: Mapping[str, Any]
source_layers: Mapping[str, str]
trace: Mapping[str, tuple[str, ...]]
@dataclass(frozen=True)
class Projection:
user_id: str
projection_type: ProjectionType
application_id: str | None
values: Mapping[str, Any]
redactions: Mapping[str, str]
class UserEngineService:
"""Headless service API for isolated user and profile management."""
def __init__(
self,
*,
store: InMemoryUserEngineStore,
identity_adapter: IdentityClaimsAdapter,
authorization: AuthorizationCheckPort,
) -> None:
self.store = store
self.identity_adapter = identity_adapter
self.authorization = authorization
if not self.store.ready:
self.store.migrate()
def health(self) -> HealthReport:
return HealthReport(status="ok", schema_version=self.store.schema_version)
def readiness(self) -> ReadinessReport:
return ReadinessReport(
ready=self.store.ready,
checks={
"store_schema": self.store.ready,
"identity_adapter": self.identity_adapter is not None,
"authorization_port": self.authorization is not None,
},
)
def me(
self,
claims: Mapping[str, Any],
*,
correlation_id: str | None = None,
) -> UserSession:
actor = self.identity_adapter.normalize(claims)
correlation_id = correlation_id or new_id("corr")
decision = self._authorize(
actor,
action="me.read",
resource_type="user-engine:me",
resource_id=actor.subject,
tenant=actor.tenant,
correlation_id=correlation_id,
)
identity = self.store.find_identity(*actor.identity_key)
if identity is not None:
user = self._require_user(identity.user_id)
account = self._require_account(user.user_id)
return self._session(actor, user, account)
user = User(
display_name=actor.preferred_username,
primary_email=_optional_claim(actor, "email"),
)
account = Account(
account_id=new_id("acct"),
user_id=user.user_id,
status=AccountStatus.ACTIVE,
)
identity = ExternalIdentity(
identity_id=new_id("idn"),
user_id=user.user_id,
issuer=actor.issuer,
subject=actor.subject,
provider=actor.authorized_party,
)
self.store.save_user(user)
self.store.save_account(account)
self.store.save_identity(identity)
self._record_mutation(
actor,
action="user.create_from_identity",
subject=user.user_id,
tenant=actor.tenant,
correlation_id=correlation_id,
decision_id=decision.decision_id,
event_type="user.created",
aggregate_id=user.user_id,
payload={
"user_id": user.user_id,
"account_id": account.account_id,
"identity": {"issuer": actor.issuer, "subject": actor.subject},
},
)
return self._session(actor, user, account)
def create_user(
self,
actor: Actor,
*,
display_name: str | None,
primary_email: str | None,
correlation_id: str | None = None,
) -> User:
correlation_id = correlation_id or new_id("corr")
decision = self._authorize(
actor,
action="user.create",
resource_type="user-engine:user",
resource_id="new",
tenant=actor.tenant,
correlation_id=correlation_id,
)
user = User(display_name=display_name, primary_email=primary_email)
account = Account(account_id=new_id("acct"), user_id=user.user_id)
self.store.save_user(user)
self.store.save_account(account)
self._record_mutation(
actor,
action="user.create",
subject=user.user_id,
tenant=actor.tenant,
correlation_id=correlation_id,
decision_id=decision.decision_id,
event_type="user.created",
aggregate_id=user.user_id,
payload={"user_id": user.user_id, "account_id": account.account_id},
)
return user
def set_account_status(
self,
actor: Actor,
user_id: str,
status: AccountStatus,
*,
correlation_id: str | None = None,
) -> Account:
account = self._require_account(user_id)
correlation_id = correlation_id or new_id("corr")
decision = self._authorize(
actor,
action="account.update",
resource_type="user-engine:account",
resource_id=account.account_id,
tenant=actor.tenant,
correlation_id=correlation_id,
target_user_id=user_id,
)
updated = replace(account, status=status)
self.store.save_account(updated)
self._record_mutation(
actor,
action="account.update",
subject=user_id,
tenant=actor.tenant,
correlation_id=correlation_id,
decision_id=decision.decision_id,
event_type="account.status_changed",
aggregate_id=account.account_id,
payload={"user_id": user_id, "status": status},
)
return updated
def link_identity(
self,
actor: Actor,
user_id: str,
*,
issuer: str,
subject: str,
provider: str | None = None,
correlation_id: str | None = None,
) -> ExternalIdentity:
self._require_user(user_id)
existing = self.store.find_identity(issuer, subject)
if existing is not None and existing.user_id != user_id:
raise ConflictError("external identity is already linked")
if existing is not None:
return existing
correlation_id = correlation_id or new_id("corr")
decision = self._authorize(
actor,
action="identity.link",
resource_type="user-engine:identity",
resource_id=f"{issuer}:{subject}",
tenant=actor.tenant,
correlation_id=correlation_id,
target_user_id=user_id,
)
identity = ExternalIdentity(
identity_id=new_id("idn"),
user_id=user_id,
issuer=issuer,
subject=subject,
provider=provider,
)
self.store.save_identity(identity)
self._record_mutation(
actor,
action="identity.link",
subject=user_id,
tenant=actor.tenant,
correlation_id=correlation_id,
decision_id=decision.decision_id,
event_type="identity.linked",
aggregate_id=user_id,
payload={"issuer": issuer, "subject": subject, "user_id": user_id},
)
return identity
def register_application(
self,
actor: Actor,
application: Application,
*,
binding: ApplicationBinding | None = None,
correlation_id: str | None = None,
) -> Application:
if application.application_id in self.store.applications:
raise ConflictError("application already exists")
correlation_id = correlation_id or new_id("corr")
decision = self._authorize(
actor,
action="application.register",
resource_type="user-engine:application",
resource_id=application.application_id,
tenant=actor.tenant,
correlation_id=correlation_id,
application_id=application.application_id,
)
self.store.save_application(application)
if binding is not None:
if binding.application_id != application.application_id:
raise ValidationError("binding application id must match application")
self.store.save_binding(binding)
self._record_mutation(
actor,
action="application.register",
subject=application.application_id,
tenant=actor.tenant,
correlation_id=correlation_id,
decision_id=decision.decision_id,
event_type="application.registered",
aggregate_id=application.application_id,
payload={"application_id": application.application_id},
application_id=application.application_id,
)
return application
def publish_catalog(
self,
actor: Actor,
catalog: Catalog,
*,
correlation_id: str | None = None,
) -> Catalog:
self._validate_catalog(catalog)
correlation_id = correlation_id or new_id("corr")
decision = self._authorize(
actor,
action="catalog.publish",
resource_type="user-engine:catalog",
resource_id=catalog.catalog_id,
tenant=actor.tenant,
correlation_id=correlation_id,
application_id=catalog.owning_application_id,
)
self.store.save_catalog(catalog)
self._record_mutation(
actor,
action="catalog.publish",
subject=catalog.catalog_id,
tenant=actor.tenant,
correlation_id=correlation_id,
decision_id=decision.decision_id,
event_type="catalog.published",
aggregate_id=catalog.catalog_id,
payload={
"catalog_id": catalog.catalog_id,
"namespace": catalog.namespace,
"version": catalog.version,
},
application_id=catalog.owning_application_id,
)
return catalog
def set_profile_value(
self,
actor: Actor,
user_id: str,
attribute_key: str,
value: Any,
*,
scope: ProfileScope = ProfileScope.GLOBAL,
scope_id: str | None = None,
application_id: str | None = None,
correlation_id: str | None = None,
) -> ProfileValue:
self._require_user(user_id)
definition = self._require_attribute(attribute_key)
self._validate_profile_scope(definition, scope, scope_id)
self._validate_value(definition, value)
correlation_id = correlation_id or new_id("corr")
decision = self._authorize(
actor,
action="profile.write",
resource_type="user-engine:profile",
resource_id=f"{user_id}:{attribute_key}",
tenant=actor.tenant,
correlation_id=correlation_id,
application_id=application_id,
target_user_id=user_id,
context={"scope": scope, "scope_id": scope_id},
)
profile_value = ProfileValue(
user_id=user_id,
attribute_key=attribute_key,
value=value,
scope=scope,
scope_id=scope_id,
source=f"{actor.issuer}:{actor.subject}",
)
self.store.save_profile_value(profile_value)
self._record_mutation(
actor,
action="profile.write",
subject=user_id,
tenant=actor.tenant,
correlation_id=correlation_id,
decision_id=decision.decision_id,
event_type="profile.value_set",
aggregate_id=user_id,
payload={
"user_id": user_id,
"attribute_key": attribute_key,
"scope": scope,
"scope_id": scope_id,
},
application_id=application_id,
)
return profile_value
def effective_profile(
self,
actor: Actor,
user_id: str,
*,
application_id: str | None = None,
correlation_id: str | None = None,
) -> EffectiveProfile:
correlation_id = correlation_id or new_id("corr")
self._authorize(
actor,
action="profile.read",
resource_type="user-engine:profile",
resource_id=user_id,
tenant=actor.tenant,
correlation_id=correlation_id,
application_id=application_id,
target_user_id=user_id,
)
return self._resolve_effective_profile(user_id, application_id)
def projection(
self,
actor: Actor,
user_id: str,
projection_type: ProjectionType,
*,
application_id: str | None = None,
correlation_id: str | None = None,
) -> Projection:
correlation_id = correlation_id or new_id("corr")
self._authorize(
actor,
action="projection.read",
resource_type="user-engine:projection",
resource_id=user_id,
tenant=actor.tenant,
correlation_id=correlation_id,
application_id=application_id,
target_user_id=user_id,
context={"projection_type": projection_type},
)
effective = self._resolve_effective_profile(user_id, application_id)
values: dict[str, Any] = {}
redactions: dict[str, str] = {}
for key, definition in self._active_attribute_definitions().items():
if key not in effective.values:
continue
if not _visible_in_projection(definition, projection_type):
continue
if _must_redact(definition, projection_type):
values[key] = REDACTED
redactions[key] = "sensitivity"
else:
values[key] = effective.values[key]
return Projection(
user_id=user_id,
projection_type=projection_type,
application_id=application_id,
values=values,
redactions=redactions,
)
def audit_records(self) -> tuple[AuditRecord, ...]:
return tuple(self.store.audit_records)
def outbox_events(self) -> tuple[OutboxEvent, ...]:
return tuple(self.store.outbox_events)
def _session(self, actor: Actor, user: User, account: Account) -> UserSession:
identities = tuple(
identity
for identity in self.store.identities.values()
if identity.user_id == user.user_id
)
return UserSession(actor=actor, user=user, account=account, identities=identities)
def _authorize(
self,
actor: Actor,
*,
action: str,
resource_type: str,
resource_id: str,
tenant: str,
correlation_id: str,
application_id: str | None = None,
target_user_id: str | None = None,
context: Mapping[str, Any] | None = None,
):
request = AuthorizationRequest(
actor=actor,
resource_type=resource_type,
resource_id=resource_id,
action=action,
tenant=tenant,
correlation_id=correlation_id,
application_id=application_id,
target_user_id=target_user_id,
context=context or {},
)
decision = self.authorization.check(request)
if not decision.allowed:
self.store.append_audit(
AuditRecord(
audit_id=new_id("aud"),
actor=actor,
action=action,
subject=resource_id,
tenant=tenant,
correlation_id=correlation_id,
decision_id=decision.decision_id,
application_id=application_id,
summary="authorization denied",
)
)
raise AuthorizationDenied(decision.reason or "authorization denied")
return decision
def _record_mutation(
self,
actor: Actor,
*,
action: str,
subject: str,
tenant: str,
correlation_id: str,
decision_id: str | None,
event_type: str,
aggregate_id: str,
payload: Mapping[str, Any],
application_id: str | None = None,
) -> None:
self.store.append_audit(
AuditRecord(
audit_id=new_id("aud"),
actor=actor,
action=action,
subject=subject,
tenant=tenant,
correlation_id=correlation_id,
decision_id=decision_id,
application_id=application_id,
summary=event_type,
)
)
self.store.append_outbox(
OutboxEvent(
event_id=new_id("evt"),
event_type=event_type,
aggregate_id=aggregate_id,
payload=dict(payload),
tenant=tenant,
correlation_id=correlation_id,
)
)
def _resolve_effective_profile(
self, user_id: str, application_id: str | None
) -> EffectiveProfile:
self._require_user(user_id)
values: dict[str, Any] = {}
source_layers: dict[str, str] = {}
trace: dict[str, list[str]] = {}
definitions = self._active_attribute_definitions()
for key, definition in definitions.items():
if definition.default is not None:
values[key] = definition.default
source_layers[key] = "catalog_default"
trace[key] = ["catalog_default"]
profile_values = self.store.values_for_user(user_id)
for profile_value in profile_values:
if profile_value.attribute_key not in definitions:
continue
if profile_value.scope != ProfileScope.GLOBAL:
continue
values[profile_value.attribute_key] = profile_value.value
source_layers[profile_value.attribute_key] = "global"
trace.setdefault(profile_value.attribute_key, []).append("global")
for profile_value in profile_values:
if profile_value.attribute_key not in definitions:
continue
if profile_value.scope != ProfileScope.APPLICATION:
continue
if profile_value.scope_id != application_id:
continue
values[profile_value.attribute_key] = profile_value.value
layer = f"application:{application_id}"
source_layers[profile_value.attribute_key] = layer
trace.setdefault(profile_value.attribute_key, []).append(layer)
return EffectiveProfile(
user_id=user_id,
application_id=application_id,
values=values,
source_layers=source_layers,
trace={key: tuple(layers) for key, layers in trace.items()},
)
def _validate_catalog(self, catalog: Catalog) -> None:
if catalog.owning_application_id not in self.store.applications:
raise ValidationError("catalog owning application is not registered")
if catalog.lifecycle != CatalogLifecycle.ACTIVE:
raise ValidationError("only active catalogs can be published")
if not catalog.attributes:
raise ValidationError("catalog must define at least one attribute")
keys = [attribute.key for attribute in catalog.attributes]
if len(keys) != len(set(keys)):
raise ValidationError("catalog attribute keys must be unique")
binding = self.store.bindings.get(catalog.owning_application_id)
if binding and catalog.namespace not in binding.catalog_namespaces:
raise ValidationError("catalog namespace is not bound to application")
active_definitions = self._active_attribute_definitions(
excluding_catalog_id=catalog.catalog_id
)
for attribute in catalog.attributes:
if not attribute.key.startswith(f"{catalog.namespace}."):
raise ValidationError("attribute keys must use the catalog namespace")
if attribute.key in active_definitions:
raise ConflictError("active catalog already owns attribute key")
if not attribute.visibility:
raise ValidationError("attribute must declare visibility")
if not attribute.mutability:
raise ValidationError("attribute must declare mutability")
if Mutability.READ_ONLY in attribute.mutability and len(attribute.mutability) > 1:
raise ValidationError("read-only mutability cannot be combined")
if attribute.default is not None:
self._validate_value(attribute, attribute.default)
def _validate_profile_scope(
self,
definition: AttributeDefinition,
scope: ProfileScope,
scope_id: str | None,
) -> None:
allowed_scopes = {ProfileScope.GLOBAL, ProfileScope.APPLICATION, definition.scope}
if scope not in allowed_scopes:
raise ValidationError("profile value scope is not allowed for attribute")
if scope == ProfileScope.APPLICATION:
if scope_id is None:
raise ValidationError("application profile values require scope_id")
if scope_id not in self.store.applications:
raise ValidationError("application profile scope_id is not registered")
elif scope_id is not None:
raise ValidationError("only application scoped values may set scope_id")
def _validate_value(self, definition: AttributeDefinition, value: Any) -> None:
if value is None:
return
validators = {
"string": lambda item: isinstance(item, str),
"integer": lambda item: isinstance(item, int) and not isinstance(item, bool),
"number": lambda item: isinstance(item, (int, float))
and not isinstance(item, bool),
"boolean": lambda item: isinstance(item, bool),
"object": lambda item: isinstance(item, Mapping),
"array": lambda item: isinstance(item, (list, tuple)),
}
validator = validators.get(definition.value_type)
if validator is None:
raise ValidationError(f"unsupported attribute value_type: {definition.value_type}")
if not validator(value):
raise ValidationError(f"{definition.key} must be {definition.value_type}")
enum_values = definition.validation.get("enum")
if enum_values is not None and value not in enum_values:
raise ValidationError(f"{definition.key} is not an allowed value")
def _require_user(self, user_id: str) -> User:
user = self.store.users.get(user_id)
if user is None:
raise NotFoundError("user not found")
return user
def _require_account(self, user_id: str) -> Account:
account = self.store.user_account(user_id)
if account is None:
raise NotFoundError("account not found")
return account
def _require_attribute(self, attribute_key: str) -> AttributeDefinition:
definition = self._active_attribute_definitions().get(attribute_key)
if definition is None:
raise NotFoundError("active attribute definition not found")
return definition
def _active_attribute_definitions(
self, *, excluding_catalog_id: str | None = None
) -> dict[str, AttributeDefinition]:
definitions: dict[str, AttributeDefinition] = {}
for catalog in self.store.catalogs.values():
if catalog.catalog_id == excluding_catalog_id:
continue
if catalog.lifecycle != CatalogLifecycle.ACTIVE:
continue
for attribute in catalog.attributes:
definitions[attribute.key] = attribute
return dict(sorted(definitions.items()))
def _optional_claim(actor: Actor, key: str) -> str | None:
value = actor.claims.get(key)
if value is None:
return None
return str(value)
def _visible_in_projection(
definition: AttributeDefinition, projection_type: ProjectionType
) -> bool:
if projection_type == ProjectionType.AUDIT:
return True
visibility = {
ProjectionType.SELF_SERVICE: Visibility.USER,
ProjectionType.ADMIN: Visibility.ADMIN,
ProjectionType.APPLICATION_RUNTIME: Visibility.APPLICATION,
ProjectionType.AGENT_CONTEXT: Visibility.APPLICATION,
ProjectionType.CLAIMS_ENRICHMENT: Visibility.APPLICATION,
}.get(projection_type)
if visibility is None:
return False
return visibility in definition.visibility
def _must_redact(
definition: AttributeDefinition, projection_type: ProjectionType
) -> bool:
if projection_type in {
ProjectionType.ADMIN,
ProjectionType.AUDIT,
ProjectionType.SELF_SERVICE,
}:
return False
return definition.sensitivity in {Sensitivity.SENSITIVE, Sensitivity.SECRET}