Files
user-engine/src/user_engine/ports.py

316 lines
11 KiB
Python

"""Implementation ports for user-engine adapters.
The domain layer should depend on these protocols. Concrete implementations
can be local test adapters, HTTP clients, database-backed stores, or platform
adapters without changing domain code.
"""
from __future__ import annotations
from contextlib import AbstractContextManager
from typing import Any, Iterable, Mapping, Protocol
from user_engine.domain import (
Account,
AccessControlFact,
AccessProfile,
ActiveAccessContext,
Actor,
Application,
ApplicationBinding,
AuditRecord,
AuthorizationDecision,
AuthorizationRequest,
CanonEntityReference,
Catalog,
ExternalIdentity,
FactorVerification,
FamilyInvitation,
IdentityFactor,
Membership,
OutboxEvent,
PreparedAccount,
ProfileValue,
RegistrationSession,
TenantAccount,
User,
)
class UserEngineStore(Protocol):
"""Durable persistence boundary for user-engine service behavior.
Implementations may be in-memory, Postgres-backed, or platform-provided,
but must preserve the same logical keys, readiness contract, and atomic
mutation semantics exposed here.
"""
schema_version: str | None
@property
def ready(self) -> bool:
"""Return whether the store is schema-compatible for service use."""
def migrate(self) -> None:
"""Apply or verify user-engine-owned schema migrations."""
def transaction(self) -> AbstractContextManager[None]:
"""Return a context manager for one atomic mutation unit."""
def save_user(self, user: User) -> None:
"""Create or replace a user record."""
def user(self, user_id: str) -> User | None:
"""Return a user by id."""
def save_account(self, account: Account) -> None:
"""Create or replace a primary account record."""
def user_account(self, user_id: str) -> Account | None:
"""Return the primary account for a user."""
def save_identity(self, identity: ExternalIdentity) -> None:
"""Create or replace an external identity link."""
def find_identity(self, issuer: str, subject: str) -> ExternalIdentity | None:
"""Return an external identity by issuer and subject."""
def identities_for_user(self, user_id: str) -> tuple[ExternalIdentity, ...]:
"""Return all external identities linked to a user."""
def save_tenant_account(self, account: TenantAccount) -> None:
"""Create or replace a tenant-scoped account record."""
def tenant_account(self, tenant: str, user_id: str) -> TenantAccount | None:
"""Return a tenant-scoped account record."""
def save_membership(self, membership: Membership) -> None:
"""Create or replace a membership fact."""
def memberships_for_user(
self, user_id: str, *, tenant: str | None = None
) -> tuple[Membership, ...]:
"""Return memberships for a user, optionally scoped to a tenant."""
def memberships_for_tenant(self, tenant: str) -> tuple[Membership, ...]:
"""Return memberships scoped to a tenant."""
def save_application(self, application: Application) -> None:
"""Create or replace an application registration."""
def application(self, application_id: str) -> Application | None:
"""Return an application by id."""
def save_binding(self, binding: ApplicationBinding) -> None:
"""Create or replace an application binding."""
def binding(self, application_id: str) -> ApplicationBinding | None:
"""Return an application binding by application id."""
def save_catalog(self, catalog: Catalog) -> None:
"""Create or replace a catalog."""
def catalog(self, catalog_id: str) -> Catalog | None:
"""Return a catalog by id."""
def all_catalogs(self) -> tuple[Catalog, ...]:
"""Return all catalogs."""
def save_family_invitation(self, invitation: FamilyInvitation) -> None:
"""Create or replace a family invitation."""
def family_invitation(self, invitation_id: str) -> FamilyInvitation | None:
"""Return a family invitation by id."""
def family_invitations_for_user(
self, user_id: str
) -> tuple[FamilyInvitation, ...]:
"""Return family invitations for a user."""
def save_registration_session(self, session: RegistrationSession) -> None:
"""Create or replace a registration session."""
def registration_session(
self, registration_id: str
) -> RegistrationSession | None:
"""Return a registration session by id."""
def all_registration_sessions(self) -> tuple[RegistrationSession, ...]:
"""Return all registration sessions."""
def save_identity_factor(self, factor: IdentityFactor) -> None:
"""Create or replace a verified identity factor."""
def identity_factor(self, factor_id: str) -> IdentityFactor | None:
"""Return a verified identity factor by id."""
def factors_for_registration(
self, registration_id: str
) -> tuple[IdentityFactor, ...]:
"""Return verified factors attached to a registration session."""
def factors_for_user(self, user_id: str) -> tuple[IdentityFactor, ...]:
"""Return verified factors attached to a user."""
def save_prepared_account(self, account: PreparedAccount) -> None:
"""Create or replace a prepared account package."""
def prepared_account(self, prepared_account_id: str) -> PreparedAccount | None:
"""Return a prepared account package by id."""
def prepared_accounts_for_tenant(
self, tenant: str
) -> tuple[PreparedAccount, ...]:
"""Return prepared account packages for a tenant."""
def save_access_profile(self, profile: AccessProfile) -> None:
"""Create or replace an access profile template."""
def access_profile(self, access_profile_id: str) -> AccessProfile | None:
"""Return an access profile template by id."""
def access_profiles_for_tenant(self, tenant: str) -> tuple[AccessProfile, ...]:
"""Return access profile templates for a tenant."""
def save_active_access_context(self, context: ActiveAccessContext) -> None:
"""Create or replace the user's active access context for a tenant."""
def active_access_context(
self, user_id: str, tenant: str
) -> ActiveAccessContext | None:
"""Return the user's active access context for a tenant."""
def active_access_contexts_for_tenant(
self, tenant: str
) -> tuple[ActiveAccessContext, ...]:
"""Return active access contexts for a tenant."""
def save_profile_value(self, value: ProfileValue) -> None:
"""Create or replace a profile value."""
def values_for_user(self, user_id: str) -> tuple[ProfileValue, ...]:
"""Return profile values for a user."""
def append_audit(self, record: AuditRecord) -> None:
"""Append a local audit record."""
def audit_log(self) -> tuple[AuditRecord, ...]:
"""Return local audit records in write order."""
def append_outbox(self, event: OutboxEvent) -> None:
"""Append an outbox event."""
def pending_outbox(self) -> tuple[OutboxEvent, ...]:
"""Return pending outbox events in write order."""
def record_counts(self) -> Mapping[str, int]:
"""Return adapter-neutral record counts for diagnostics."""
class IdentityClaimsAdapter(Protocol):
"""Normalize verified identity claims into a user-engine actor."""
def normalize(self, claims: Mapping[str, Any]) -> Actor:
"""Return a normalized actor from already-verified claims."""
def identity_key(self, actor: Actor) -> tuple[str, str]:
"""Return the stable external identity link key."""
class FactorVerificationAdapter(Protocol):
"""Normalize external proofing results into safe factor evidence."""
def normalize(self, proofing_result: Mapping[str, Any]) -> FactorVerification:
"""Return normalized verified factor evidence without secret payloads."""
class AuthorizationCheckPort(Protocol):
"""Ask whether an actor may perform an action."""
def check(self, request: AuthorizationRequest) -> AuthorizationDecision:
"""Return the authorization decision for one request."""
def batch_check(
self, requests: Iterable[AuthorizationRequest]
) -> tuple[AuthorizationDecision, ...]:
"""Return decisions in request order."""
class ApplicationBindingStore(Protocol):
"""Store links between user-engine applications and external systems."""
def get(self, application_id: str) -> ApplicationBinding | None:
"""Return a binding by user-engine application id."""
def save(self, binding: ApplicationBinding) -> None:
"""Create or replace an application binding."""
class MembershipFactExporter(Protocol):
"""Export membership facts as read models for authorization systems."""
def export(self, memberships: Iterable[Membership]) -> Mapping[str, Any]:
"""Return an adapter-neutral membership fact manifest."""
class AccessControlFactExporter(Protocol):
"""Export access-control facts to an external policy or ACL system."""
def export(self, facts: Iterable[AccessControlFact]) -> Mapping[str, Any]:
"""Return an adapter-neutral access-control fact manifest."""
class EventOutbox(Protocol):
"""Persist and publish durable domain events."""
def append(self, event: OutboxEvent) -> None:
"""Append an event in the same unit of work as its mutation."""
def pending(self) -> tuple[OutboxEvent, ...]:
"""Return events waiting for delivery."""
class AuditWriter(Protocol):
"""Persist local audit records and support platform audit export."""
def record(self, audit_record: AuditRecord) -> None:
"""Persist an audit record."""
class EvidenceReferenceExporter(Protocol):
"""Export audit/review material as identity-canon evidence references."""
def export(
self, audit_records: Iterable[AuditRecord]
) -> tuple[CanonEntityReference, ...]:
"""Return evidence references without owning the platform audit sink."""
class PolicyControlReferenceResolver(Protocol):
"""Resolve policy/control references for identity-domain traces."""
def references_for(
self, request: AuthorizationRequest, decision: AuthorizationDecision
) -> Mapping[str, CanonEntityReference]:
"""Return policy, control, review, or exception references when known."""
class LifecycleTaskSink(Protocol):
"""Handoff identity-domain gaps or lifecycle work to a task system."""
def create_or_link(
self,
*,
summary: str,
subject: CanonEntityReference,
evidence: Iterable[CanonEntityReference] = (),
) -> CanonEntityReference:
"""Return the task reference created or linked by the downstream system."""
class SecretProvider(Protocol):
"""Load runtime secret material from the active environment."""
def get(self, name: str) -> str:
"""Return a secret value by logical name."""