generated from coulomb/repo-seed
feat: implement prepared account claims
This commit is contained in:
@@ -163,6 +163,7 @@ class _ProtocolOnlyStore:
|
||||
"identity_factors",
|
||||
"memberships",
|
||||
"outbox_events",
|
||||
"prepared_accounts",
|
||||
"profile_values",
|
||||
"registration_sessions",
|
||||
"tenant_accounts",
|
||||
|
||||
420
tests/test_prepared_accounts.py
Normal file
420
tests/test_prepared_accounts.py
Normal file
@@ -0,0 +1,420 @@
|
||||
from datetime import timedelta
|
||||
import unittest
|
||||
|
||||
from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort
|
||||
from user_engine.domain import (
|
||||
AccountStatus,
|
||||
CanonEntityReference,
|
||||
Catalog,
|
||||
CatalogLifecycle,
|
||||
FactorVerification,
|
||||
IdentityFactorType,
|
||||
PreparedAccount,
|
||||
PreparedAccountStatus,
|
||||
PreparedEntitlement,
|
||||
PreparedEntitlementKind,
|
||||
PreparedFactorRequirement,
|
||||
ProfileScope,
|
||||
utc_now,
|
||||
)
|
||||
from user_engine.errors import AuthorizationDenied, ConflictError, ValidationError
|
||||
from user_engine.service import UserEngineService
|
||||
from user_engine.testing.fixtures import (
|
||||
FixtureIdentityClaimsAdapter,
|
||||
human_actor_claims,
|
||||
sample_application,
|
||||
sample_application_binding,
|
||||
sample_catalog,
|
||||
)
|
||||
|
||||
|
||||
class PreparedAccountTests(unittest.TestCase):
|
||||
def test_claim_prepared_account_activates_entitlements(self):
|
||||
service, store = _service()
|
||||
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
||||
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
||||
_bootstrap_catalog(service, preparer)
|
||||
prepared = _prepare_demo_account(service, preparer)
|
||||
registration = _complete_registration(service, applicant)
|
||||
|
||||
claim = service.claim_prepared_account(
|
||||
applicant,
|
||||
registration.session.registration_id,
|
||||
correlation_id="corr-claim",
|
||||
)
|
||||
|
||||
self.assertEqual(claim.prepared_account.status, PreparedAccountStatus.CLAIMED)
|
||||
self.assertEqual(claim.prepared_account.claimed_by_user_id, claim.user.user_id)
|
||||
self.assertEqual(claim.tenant_accounts[0].status, AccountStatus.ACTIVE)
|
||||
self.assertEqual(claim.memberships[0].scope_id, "team:demo")
|
||||
self.assertEqual(claim.memberships[0].kind, "member")
|
||||
self.assertEqual(claim.profile_values[0].attribute_key, "demo.display_density")
|
||||
self.assertEqual(claim.profile_values[0].value, "compact")
|
||||
self.assertEqual(claim.onboarding_journeys, ("welcome-demo",))
|
||||
self.assertEqual(
|
||||
store.prepared_account(prepared.prepared_account_id).status,
|
||||
PreparedAccountStatus.CLAIMED,
|
||||
)
|
||||
self.assertIn(
|
||||
"prepared_account.onboarding_requested",
|
||||
[event.event_type for event in service.outbox_events()],
|
||||
)
|
||||
self.assertNotIn(
|
||||
"sample.user@example.test",
|
||||
repr([event.payload for event in service.outbox_events()]),
|
||||
)
|
||||
|
||||
def test_claim_requires_matching_verified_factor(self):
|
||||
service, store = _service()
|
||||
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
||||
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
||||
_bootstrap_catalog(service, preparer)
|
||||
prepared = _prepare_demo_account(
|
||||
service,
|
||||
preparer,
|
||||
email="different@example.test",
|
||||
)
|
||||
registration = _complete_registration(service, applicant)
|
||||
before_outbox = len(service.outbox_events())
|
||||
|
||||
with self.assertRaises(ValidationError):
|
||||
service.claim_prepared_account(
|
||||
applicant,
|
||||
registration.session.registration_id,
|
||||
prepared_account_id=prepared.prepared_account_id,
|
||||
correlation_id="corr-claim-mismatch",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
store.prepared_account(prepared.prepared_account_id).status,
|
||||
PreparedAccountStatus.PENDING,
|
||||
)
|
||||
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
|
||||
self.assertEqual(len(service.outbox_events()), before_outbox)
|
||||
self.assertEqual(
|
||||
service.audit_records()[-1].summary,
|
||||
"prepared account claim denied: factor mismatch or closed",
|
||||
)
|
||||
|
||||
def test_claim_ignores_expired_factor_evidence(self):
|
||||
service, store = _service()
|
||||
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
||||
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
||||
_bootstrap_catalog(service, preparer)
|
||||
prepared = _prepare_demo_account(service, preparer)
|
||||
registration = _complete_registration(
|
||||
service,
|
||||
applicant,
|
||||
factor_expires_at=utc_now() - timedelta(days=1),
|
||||
)
|
||||
|
||||
with self.assertRaises(ValidationError):
|
||||
service.claim_prepared_account(
|
||||
applicant,
|
||||
registration.session.registration_id,
|
||||
prepared_account_id=prepared.prepared_account_id,
|
||||
correlation_id="corr-claim-expired-factor",
|
||||
)
|
||||
|
||||
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
|
||||
self.assertEqual(
|
||||
service.audit_records()[-1].summary,
|
||||
"prepared account claim denied: factor mismatch or closed",
|
||||
)
|
||||
|
||||
def test_ambiguous_prepared_account_matches_fail_closed(self):
|
||||
service, store = _service()
|
||||
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
||||
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
||||
_bootstrap_catalog(service, preparer)
|
||||
requirement = _email_requirement()
|
||||
entitlements = (_membership_entitlement(),)
|
||||
store.save_prepared_account(
|
||||
PreparedAccount(
|
||||
tenant="tenant:coulomb",
|
||||
required_factor_matches=(requirement,),
|
||||
entitlements=entitlements,
|
||||
prepared_by_subject="fixture",
|
||||
)
|
||||
)
|
||||
store.save_prepared_account(
|
||||
PreparedAccount(
|
||||
tenant="tenant:coulomb",
|
||||
required_factor_matches=(requirement,),
|
||||
entitlements=entitlements,
|
||||
prepared_by_subject="fixture",
|
||||
)
|
||||
)
|
||||
registration = _complete_registration(service, applicant)
|
||||
|
||||
with self.assertRaises(ConflictError):
|
||||
service.claim_prepared_account(
|
||||
applicant,
|
||||
registration.session.registration_id,
|
||||
correlation_id="corr-ambiguous",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
service.audit_records()[-1].summary,
|
||||
"prepared account claim denied: ambiguous match",
|
||||
)
|
||||
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
|
||||
|
||||
def test_approval_required_entitlement_blocks_claim(self):
|
||||
service, store = _service()
|
||||
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
||||
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
||||
prepared = service.prepare_account(
|
||||
preparer,
|
||||
tenant="tenant:coulomb",
|
||||
required_factor_matches=(_email_requirement(),),
|
||||
entitlements=(
|
||||
PreparedEntitlement(
|
||||
kind=PreparedEntitlementKind.MEMBERSHIP,
|
||||
tenant="tenant:coulomb",
|
||||
scope_type="team",
|
||||
scope_id="team:ops",
|
||||
role="admin",
|
||||
requires_approval=True,
|
||||
),
|
||||
),
|
||||
correlation_id="corr-prepare-privileged",
|
||||
)
|
||||
registration = _complete_registration(service, applicant)
|
||||
|
||||
with self.assertRaises(AuthorizationDenied):
|
||||
service.claim_prepared_account(
|
||||
applicant,
|
||||
registration.session.registration_id,
|
||||
prepared_account_id=prepared.prepared_account_id,
|
||||
correlation_id="corr-claim-privileged",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
store.prepared_account(prepared.prepared_account_id).status,
|
||||
PreparedAccountStatus.PENDING,
|
||||
)
|
||||
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
|
||||
self.assertEqual(
|
||||
service.audit_records()[-1].summary,
|
||||
"prepared account claim denied: approval required",
|
||||
)
|
||||
|
||||
def test_revoked_and_expired_prepared_accounts_cannot_be_claimed(self):
|
||||
service, store = _service()
|
||||
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
||||
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
||||
_bootstrap_catalog(service, preparer)
|
||||
revoked = _prepare_demo_account(service, preparer)
|
||||
service.revoke_prepared_account(
|
||||
preparer,
|
||||
revoked.prepared_account_id,
|
||||
correlation_id="corr-revoke",
|
||||
)
|
||||
expired = _prepare_demo_account(service, preparer)
|
||||
service.expire_prepared_account(
|
||||
preparer,
|
||||
expired.prepared_account_id,
|
||||
correlation_id="corr-expire",
|
||||
)
|
||||
registration = _complete_registration(service, applicant)
|
||||
|
||||
for prepared in (revoked, expired):
|
||||
with self.assertRaises(ValidationError):
|
||||
service.claim_prepared_account(
|
||||
applicant,
|
||||
registration.session.registration_id,
|
||||
prepared_account_id=prepared.prepared_account_id,
|
||||
correlation_id=f"corr-claim-{prepared.prepared_account_id}",
|
||||
)
|
||||
|
||||
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
|
||||
self.assertEqual(
|
||||
service.audit_records()[-1].summary,
|
||||
"prepared account claim denied: factor mismatch or closed",
|
||||
)
|
||||
|
||||
def test_duplicate_pending_prepared_accounts_are_rejected(self):
|
||||
service, _ = _service()
|
||||
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
||||
_bootstrap_catalog(service, preparer)
|
||||
_prepare_demo_account(service, preparer)
|
||||
|
||||
with self.assertRaises(ConflictError):
|
||||
_prepare_demo_account(service, preparer)
|
||||
|
||||
def test_weak_factor_requirements_are_rejected(self):
|
||||
service, _ = _service()
|
||||
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
||||
|
||||
with self.assertRaises(ValidationError):
|
||||
service.prepare_account(
|
||||
preparer,
|
||||
tenant="tenant:coulomb",
|
||||
required_factor_matches=(
|
||||
PreparedFactorRequirement(
|
||||
factor_type=IdentityFactorType.EMAIL,
|
||||
normalized_value=" ",
|
||||
),
|
||||
),
|
||||
entitlements=(_membership_entitlement(),),
|
||||
correlation_id="corr-weak-factor",
|
||||
)
|
||||
|
||||
def test_revoke_and_list_prepared_accounts(self):
|
||||
service, _ = _service()
|
||||
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
||||
prepared = _prepare_demo_account(service, preparer)
|
||||
|
||||
listed = service.list_prepared_accounts(
|
||||
preparer,
|
||||
tenant="tenant:coulomb",
|
||||
correlation_id="corr-list",
|
||||
)
|
||||
revoked = service.revoke_prepared_account(
|
||||
preparer,
|
||||
prepared.prepared_account_id,
|
||||
correlation_id="corr-revoke",
|
||||
)
|
||||
|
||||
self.assertEqual(listed[0].prepared_account_id, prepared.prepared_account_id)
|
||||
self.assertEqual(revoked.status, PreparedAccountStatus.REVOKED)
|
||||
|
||||
|
||||
def _service():
|
||||
store = InMemoryUserEngineStore()
|
||||
service = UserEngineService(
|
||||
store=store,
|
||||
identity_adapter=FixtureIdentityClaimsAdapter(),
|
||||
authorization=LocalAuthorizationCheckPort(),
|
||||
)
|
||||
return service, store
|
||||
|
||||
|
||||
def _actor(
|
||||
*,
|
||||
subject: str = "sample-user",
|
||||
roles: tuple[str, ...] = ("user",),
|
||||
email: str = "sample.user@example.test",
|
||||
):
|
||||
claims = human_actor_claims(subject=subject, tenant="tenant:coulomb")
|
||||
claims["roles"] = list(roles)
|
||||
claims["email"] = email
|
||||
claims["preferred_username"] = subject
|
||||
return FixtureIdentityClaimsAdapter().normalize(claims)
|
||||
|
||||
|
||||
def _bootstrap_catalog(service: UserEngineService, actor):
|
||||
service.register_application(
|
||||
actor,
|
||||
sample_application(),
|
||||
binding=sample_application_binding(),
|
||||
correlation_id="corr-app",
|
||||
)
|
||||
service.publish_catalog(
|
||||
actor,
|
||||
Catalog(
|
||||
catalog_id="demo-prepared-profile",
|
||||
namespace=sample_catalog().namespace,
|
||||
version=sample_catalog().version,
|
||||
owning_application_id=sample_catalog().owning_application_id,
|
||||
lifecycle=CatalogLifecycle.ACTIVE,
|
||||
attributes=sample_catalog().attributes,
|
||||
),
|
||||
correlation_id="corr-catalog",
|
||||
)
|
||||
|
||||
|
||||
def _complete_registration(
|
||||
service: UserEngineService,
|
||||
actor,
|
||||
*,
|
||||
factor_expires_at=None,
|
||||
):
|
||||
session = service.start_registration(actor, correlation_id="corr-start")
|
||||
service.attach_registration_factor(
|
||||
actor,
|
||||
session.registration_id,
|
||||
FactorVerification(
|
||||
factor_type=IdentityFactorType.EMAIL,
|
||||
normalized_value="sample.user@example.test",
|
||||
display_value="sample.user@example.test",
|
||||
source_system="fixture-email",
|
||||
evidence_refs=(
|
||||
CanonEntityReference(
|
||||
concept="Evidence Source",
|
||||
identifier="email-proof",
|
||||
source_system="fixture-email",
|
||||
),
|
||||
),
|
||||
expires_at=factor_expires_at,
|
||||
),
|
||||
correlation_id="corr-factor",
|
||||
)
|
||||
return service.complete_registration(
|
||||
actor,
|
||||
session.registration_id,
|
||||
correlation_id="corr-complete",
|
||||
)
|
||||
|
||||
|
||||
def _prepare_demo_account(
|
||||
service: UserEngineService,
|
||||
preparer,
|
||||
*,
|
||||
email: str = "sample.user@example.test",
|
||||
):
|
||||
return service.prepare_account(
|
||||
preparer,
|
||||
tenant="tenant:coulomb",
|
||||
required_factor_matches=(_email_requirement(email=email),),
|
||||
entitlements=(
|
||||
PreparedEntitlement(
|
||||
kind=PreparedEntitlementKind.TENANT_ACCOUNT,
|
||||
tenant="tenant:coulomb",
|
||||
tenant_account_status=AccountStatus.ACTIVE,
|
||||
),
|
||||
_membership_entitlement(),
|
||||
PreparedEntitlement(
|
||||
kind=PreparedEntitlementKind.PROFILE_VALUE,
|
||||
tenant="tenant:coulomb",
|
||||
attribute_key="demo.display_density",
|
||||
value="compact",
|
||||
profile_scope=ProfileScope.APPLICATION,
|
||||
profile_scope_id="app.demo",
|
||||
),
|
||||
PreparedEntitlement(
|
||||
kind=PreparedEntitlementKind.ONBOARDING_JOURNEY,
|
||||
tenant="tenant:coulomb",
|
||||
onboarding_journey="welcome-demo",
|
||||
),
|
||||
),
|
||||
display_name="Prepared User",
|
||||
primary_email=email,
|
||||
correlation_id="corr-prepare",
|
||||
)
|
||||
|
||||
|
||||
def _email_requirement(
|
||||
*,
|
||||
email: str = "sample.user@example.test",
|
||||
) -> PreparedFactorRequirement:
|
||||
return PreparedFactorRequirement(
|
||||
factor_type=IdentityFactorType.EMAIL,
|
||||
normalized_value=email,
|
||||
source_system="fixture-email",
|
||||
)
|
||||
|
||||
|
||||
def _membership_entitlement() -> PreparedEntitlement:
|
||||
return PreparedEntitlement(
|
||||
kind=PreparedEntitlementKind.MEMBERSHIP,
|
||||
tenant="tenant:coulomb",
|
||||
scope_type="team",
|
||||
scope_id="team:demo",
|
||||
role="member",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user