Files
user-engine/tests/test_prepared_accounts.py

421 lines
15 KiB
Python

from datetime import timedelta
import unittest
from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort
from user_engine.domain import (
AccountStatus,
CanonEntityReference,
Catalog,
CatalogLifecycle,
FactorVerification,
IdentityFactorType,
PreparedAccount,
PreparedAccountStatus,
PreparedEntitlement,
PreparedEntitlementKind,
PreparedFactorRequirement,
ProfileScope,
utc_now,
)
from user_engine.errors import AuthorizationDenied, ConflictError, ValidationError
from user_engine.service import UserEngineService
from user_engine.testing.fixtures import (
FixtureIdentityClaimsAdapter,
human_actor_claims,
sample_application,
sample_application_binding,
sample_catalog,
)
class PreparedAccountTests(unittest.TestCase):
def test_claim_prepared_account_activates_entitlements(self):
service, store = _service()
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
applicant = _actor(subject="new-user", email="sample.user@example.test")
_bootstrap_catalog(service, preparer)
prepared = _prepare_demo_account(service, preparer)
registration = _complete_registration(service, applicant)
claim = service.claim_prepared_account(
applicant,
registration.session.registration_id,
correlation_id="corr-claim",
)
self.assertEqual(claim.prepared_account.status, PreparedAccountStatus.CLAIMED)
self.assertEqual(claim.prepared_account.claimed_by_user_id, claim.user.user_id)
self.assertEqual(claim.tenant_accounts[0].status, AccountStatus.ACTIVE)
self.assertEqual(claim.memberships[0].scope_id, "team:demo")
self.assertEqual(claim.memberships[0].kind, "member")
self.assertEqual(claim.profile_values[0].attribute_key, "demo.display_density")
self.assertEqual(claim.profile_values[0].value, "compact")
self.assertEqual(claim.onboarding_journeys, ("welcome-demo",))
self.assertEqual(
store.prepared_account(prepared.prepared_account_id).status,
PreparedAccountStatus.CLAIMED,
)
self.assertIn(
"prepared_account.onboarding_requested",
[event.event_type for event in service.outbox_events()],
)
self.assertNotIn(
"sample.user@example.test",
repr([event.payload for event in service.outbox_events()]),
)
def test_claim_requires_matching_verified_factor(self):
service, store = _service()
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
applicant = _actor(subject="new-user", email="sample.user@example.test")
_bootstrap_catalog(service, preparer)
prepared = _prepare_demo_account(
service,
preparer,
email="different@example.test",
)
registration = _complete_registration(service, applicant)
before_outbox = len(service.outbox_events())
with self.assertRaises(ValidationError):
service.claim_prepared_account(
applicant,
registration.session.registration_id,
prepared_account_id=prepared.prepared_account_id,
correlation_id="corr-claim-mismatch",
)
self.assertEqual(
store.prepared_account(prepared.prepared_account_id).status,
PreparedAccountStatus.PENDING,
)
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
self.assertEqual(len(service.outbox_events()), before_outbox)
self.assertEqual(
service.audit_records()[-1].summary,
"prepared account claim denied: factor mismatch or closed",
)
def test_claim_ignores_expired_factor_evidence(self):
service, store = _service()
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
applicant = _actor(subject="new-user", email="sample.user@example.test")
_bootstrap_catalog(service, preparer)
prepared = _prepare_demo_account(service, preparer)
registration = _complete_registration(
service,
applicant,
factor_expires_at=utc_now() - timedelta(days=1),
)
with self.assertRaises(ValidationError):
service.claim_prepared_account(
applicant,
registration.session.registration_id,
prepared_account_id=prepared.prepared_account_id,
correlation_id="corr-claim-expired-factor",
)
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
self.assertEqual(
service.audit_records()[-1].summary,
"prepared account claim denied: factor mismatch or closed",
)
def test_ambiguous_prepared_account_matches_fail_closed(self):
service, store = _service()
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
applicant = _actor(subject="new-user", email="sample.user@example.test")
_bootstrap_catalog(service, preparer)
requirement = _email_requirement()
entitlements = (_membership_entitlement(),)
store.save_prepared_account(
PreparedAccount(
tenant="tenant:coulomb",
required_factor_matches=(requirement,),
entitlements=entitlements,
prepared_by_subject="fixture",
)
)
store.save_prepared_account(
PreparedAccount(
tenant="tenant:coulomb",
required_factor_matches=(requirement,),
entitlements=entitlements,
prepared_by_subject="fixture",
)
)
registration = _complete_registration(service, applicant)
with self.assertRaises(ConflictError):
service.claim_prepared_account(
applicant,
registration.session.registration_id,
correlation_id="corr-ambiguous",
)
self.assertEqual(
service.audit_records()[-1].summary,
"prepared account claim denied: ambiguous match",
)
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
def test_approval_required_entitlement_blocks_claim(self):
service, store = _service()
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
applicant = _actor(subject="new-user", email="sample.user@example.test")
prepared = service.prepare_account(
preparer,
tenant="tenant:coulomb",
required_factor_matches=(_email_requirement(),),
entitlements=(
PreparedEntitlement(
kind=PreparedEntitlementKind.MEMBERSHIP,
tenant="tenant:coulomb",
scope_type="team",
scope_id="team:ops",
role="admin",
requires_approval=True,
),
),
correlation_id="corr-prepare-privileged",
)
registration = _complete_registration(service, applicant)
with self.assertRaises(AuthorizationDenied):
service.claim_prepared_account(
applicant,
registration.session.registration_id,
prepared_account_id=prepared.prepared_account_id,
correlation_id="corr-claim-privileged",
)
self.assertEqual(
store.prepared_account(prepared.prepared_account_id).status,
PreparedAccountStatus.PENDING,
)
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
self.assertEqual(
service.audit_records()[-1].summary,
"prepared account claim denied: approval required",
)
def test_revoked_and_expired_prepared_accounts_cannot_be_claimed(self):
service, store = _service()
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
applicant = _actor(subject="new-user", email="sample.user@example.test")
_bootstrap_catalog(service, preparer)
revoked = _prepare_demo_account(service, preparer)
service.revoke_prepared_account(
preparer,
revoked.prepared_account_id,
correlation_id="corr-revoke",
)
expired = _prepare_demo_account(service, preparer)
service.expire_prepared_account(
preparer,
expired.prepared_account_id,
correlation_id="corr-expire",
)
registration = _complete_registration(service, applicant)
for prepared in (revoked, expired):
with self.assertRaises(ValidationError):
service.claim_prepared_account(
applicant,
registration.session.registration_id,
prepared_account_id=prepared.prepared_account_id,
correlation_id=f"corr-claim-{prepared.prepared_account_id}",
)
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
self.assertEqual(
service.audit_records()[-1].summary,
"prepared account claim denied: factor mismatch or closed",
)
def test_duplicate_pending_prepared_accounts_are_rejected(self):
service, _ = _service()
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
_bootstrap_catalog(service, preparer)
_prepare_demo_account(service, preparer)
with self.assertRaises(ConflictError):
_prepare_demo_account(service, preparer)
def test_weak_factor_requirements_are_rejected(self):
service, _ = _service()
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
with self.assertRaises(ValidationError):
service.prepare_account(
preparer,
tenant="tenant:coulomb",
required_factor_matches=(
PreparedFactorRequirement(
factor_type=IdentityFactorType.EMAIL,
normalized_value=" ",
),
),
entitlements=(_membership_entitlement(),),
correlation_id="corr-weak-factor",
)
def test_revoke_and_list_prepared_accounts(self):
service, _ = _service()
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
prepared = _prepare_demo_account(service, preparer)
listed = service.list_prepared_accounts(
preparer,
tenant="tenant:coulomb",
correlation_id="corr-list",
)
revoked = service.revoke_prepared_account(
preparer,
prepared.prepared_account_id,
correlation_id="corr-revoke",
)
self.assertEqual(listed[0].prepared_account_id, prepared.prepared_account_id)
self.assertEqual(revoked.status, PreparedAccountStatus.REVOKED)
def _service():
store = InMemoryUserEngineStore()
service = UserEngineService(
store=store,
identity_adapter=FixtureIdentityClaimsAdapter(),
authorization=LocalAuthorizationCheckPort(),
)
return service, store
def _actor(
*,
subject: str = "sample-user",
roles: tuple[str, ...] = ("user",),
email: str = "sample.user@example.test",
):
claims = human_actor_claims(subject=subject, tenant="tenant:coulomb")
claims["roles"] = list(roles)
claims["email"] = email
claims["preferred_username"] = subject
return FixtureIdentityClaimsAdapter().normalize(claims)
def _bootstrap_catalog(service: UserEngineService, actor):
service.register_application(
actor,
sample_application(),
binding=sample_application_binding(),
correlation_id="corr-app",
)
service.publish_catalog(
actor,
Catalog(
catalog_id="demo-prepared-profile",
namespace=sample_catalog().namespace,
version=sample_catalog().version,
owning_application_id=sample_catalog().owning_application_id,
lifecycle=CatalogLifecycle.ACTIVE,
attributes=sample_catalog().attributes,
),
correlation_id="corr-catalog",
)
def _complete_registration(
service: UserEngineService,
actor,
*,
factor_expires_at=None,
):
session = service.start_registration(actor, correlation_id="corr-start")
service.attach_registration_factor(
actor,
session.registration_id,
FactorVerification(
factor_type=IdentityFactorType.EMAIL,
normalized_value="sample.user@example.test",
display_value="sample.user@example.test",
source_system="fixture-email",
evidence_refs=(
CanonEntityReference(
concept="Evidence Source",
identifier="email-proof",
source_system="fixture-email",
),
),
expires_at=factor_expires_at,
),
correlation_id="corr-factor",
)
return service.complete_registration(
actor,
session.registration_id,
correlation_id="corr-complete",
)
def _prepare_demo_account(
service: UserEngineService,
preparer,
*,
email: str = "sample.user@example.test",
):
return service.prepare_account(
preparer,
tenant="tenant:coulomb",
required_factor_matches=(_email_requirement(email=email),),
entitlements=(
PreparedEntitlement(
kind=PreparedEntitlementKind.TENANT_ACCOUNT,
tenant="tenant:coulomb",
tenant_account_status=AccountStatus.ACTIVE,
),
_membership_entitlement(),
PreparedEntitlement(
kind=PreparedEntitlementKind.PROFILE_VALUE,
tenant="tenant:coulomb",
attribute_key="demo.display_density",
value="compact",
profile_scope=ProfileScope.APPLICATION,
profile_scope_id="app.demo",
),
PreparedEntitlement(
kind=PreparedEntitlementKind.ONBOARDING_JOURNEY,
tenant="tenant:coulomb",
onboarding_journey="welcome-demo",
),
),
display_name="Prepared User",
primary_email=email,
correlation_id="corr-prepare",
)
def _email_requirement(
*,
email: str = "sample.user@example.test",
) -> PreparedFactorRequirement:
return PreparedFactorRequirement(
factor_type=IdentityFactorType.EMAIL,
normalized_value=email,
source_system="fixture-email",
)
def _membership_entitlement() -> PreparedEntitlement:
return PreparedEntitlement(
kind=PreparedEntitlementKind.MEMBERSHIP,
tenant="tenant:coulomb",
scope_type="team",
scope_id="team:demo",
role="member",
)
if __name__ == "__main__":
unittest.main()