from datetime import timedelta import unittest from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort from user_engine.domain import ( AccountStatus, CanonEntityReference, Catalog, CatalogLifecycle, FactorVerification, IdentityFactorType, PreparedAccount, PreparedAccountStatus, PreparedEntitlement, PreparedEntitlementKind, PreparedFactorRequirement, ProfileScope, utc_now, ) from user_engine.errors import AuthorizationDenied, ConflictError, ValidationError from user_engine.service import UserEngineService from user_engine.testing.fixtures import ( FixtureIdentityClaimsAdapter, human_actor_claims, sample_application, sample_application_binding, sample_catalog, ) class PreparedAccountTests(unittest.TestCase): def test_claim_prepared_account_activates_entitlements(self): service, store = _service() preparer = _actor(subject="tenant-admin", roles=("tenant-admin",)) applicant = _actor(subject="new-user", email="sample.user@example.test") _bootstrap_catalog(service, preparer) prepared = _prepare_demo_account(service, preparer) registration = _complete_registration(service, applicant) claim = service.claim_prepared_account( applicant, registration.session.registration_id, correlation_id="corr-claim", ) self.assertEqual(claim.prepared_account.status, PreparedAccountStatus.CLAIMED) self.assertEqual(claim.prepared_account.claimed_by_user_id, claim.user.user_id) self.assertEqual(claim.tenant_accounts[0].status, AccountStatus.ACTIVE) self.assertEqual(claim.memberships[0].scope_id, "team:demo") self.assertEqual(claim.memberships[0].kind, "member") self.assertEqual(claim.profile_values[0].attribute_key, "demo.display_density") self.assertEqual(claim.profile_values[0].value, "compact") self.assertEqual(claim.onboarding_journeys, ("welcome-demo",)) self.assertEqual( store.prepared_account(prepared.prepared_account_id).status, PreparedAccountStatus.CLAIMED, ) self.assertIn( "prepared_account.onboarding_requested", [event.event_type for event in service.outbox_events()], ) self.assertNotIn( "sample.user@example.test", repr([event.payload for event in service.outbox_events()]), ) def test_claim_requires_matching_verified_factor(self): service, store = _service() preparer = _actor(subject="tenant-admin", roles=("tenant-admin",)) applicant = _actor(subject="new-user", email="sample.user@example.test") _bootstrap_catalog(service, preparer) prepared = _prepare_demo_account( service, preparer, email="different@example.test", ) registration = _complete_registration(service, applicant) before_outbox = len(service.outbox_events()) with self.assertRaises(ValidationError): service.claim_prepared_account( applicant, registration.session.registration_id, prepared_account_id=prepared.prepared_account_id, correlation_id="corr-claim-mismatch", ) self.assertEqual( store.prepared_account(prepared.prepared_account_id).status, PreparedAccountStatus.PENDING, ) self.assertEqual(store.memberships_for_user(registration.user.user_id), ()) self.assertEqual(len(service.outbox_events()), before_outbox) self.assertEqual( service.audit_records()[-1].summary, "prepared account claim denied: factor mismatch or closed", ) def test_claim_ignores_expired_factor_evidence(self): service, store = _service() preparer = _actor(subject="tenant-admin", roles=("tenant-admin",)) applicant = _actor(subject="new-user", email="sample.user@example.test") _bootstrap_catalog(service, preparer) prepared = _prepare_demo_account(service, preparer) registration = _complete_registration( service, applicant, factor_expires_at=utc_now() - timedelta(days=1), ) with self.assertRaises(ValidationError): service.claim_prepared_account( applicant, registration.session.registration_id, prepared_account_id=prepared.prepared_account_id, correlation_id="corr-claim-expired-factor", ) self.assertEqual(store.memberships_for_user(registration.user.user_id), ()) self.assertEqual( service.audit_records()[-1].summary, "prepared account claim denied: factor mismatch or closed", ) def test_ambiguous_prepared_account_matches_fail_closed(self): service, store = _service() preparer = _actor(subject="tenant-admin", roles=("tenant-admin",)) applicant = _actor(subject="new-user", email="sample.user@example.test") _bootstrap_catalog(service, preparer) requirement = _email_requirement() entitlements = (_membership_entitlement(),) store.save_prepared_account( PreparedAccount( tenant="tenant:coulomb", required_factor_matches=(requirement,), entitlements=entitlements, prepared_by_subject="fixture", ) ) store.save_prepared_account( PreparedAccount( tenant="tenant:coulomb", required_factor_matches=(requirement,), entitlements=entitlements, prepared_by_subject="fixture", ) ) registration = _complete_registration(service, applicant) with self.assertRaises(ConflictError): service.claim_prepared_account( applicant, registration.session.registration_id, correlation_id="corr-ambiguous", ) self.assertEqual( service.audit_records()[-1].summary, "prepared account claim denied: ambiguous match", ) self.assertEqual(store.memberships_for_user(registration.user.user_id), ()) def test_approval_required_entitlement_blocks_claim(self): service, store = _service() preparer = _actor(subject="tenant-admin", roles=("tenant-admin",)) applicant = _actor(subject="new-user", email="sample.user@example.test") prepared = service.prepare_account( preparer, tenant="tenant:coulomb", required_factor_matches=(_email_requirement(),), entitlements=( PreparedEntitlement( kind=PreparedEntitlementKind.MEMBERSHIP, tenant="tenant:coulomb", scope_type="team", scope_id="team:ops", role="admin", requires_approval=True, ), ), correlation_id="corr-prepare-privileged", ) registration = _complete_registration(service, applicant) with self.assertRaises(AuthorizationDenied): service.claim_prepared_account( applicant, registration.session.registration_id, prepared_account_id=prepared.prepared_account_id, correlation_id="corr-claim-privileged", ) self.assertEqual( store.prepared_account(prepared.prepared_account_id).status, PreparedAccountStatus.PENDING, ) self.assertEqual(store.memberships_for_user(registration.user.user_id), ()) self.assertEqual( service.audit_records()[-1].summary, "prepared account claim denied: approval required", ) def test_revoked_and_expired_prepared_accounts_cannot_be_claimed(self): service, store = _service() preparer = _actor(subject="tenant-admin", roles=("tenant-admin",)) applicant = _actor(subject="new-user", email="sample.user@example.test") _bootstrap_catalog(service, preparer) revoked = _prepare_demo_account(service, preparer) service.revoke_prepared_account( preparer, revoked.prepared_account_id, correlation_id="corr-revoke", ) expired = _prepare_demo_account(service, preparer) service.expire_prepared_account( preparer, expired.prepared_account_id, correlation_id="corr-expire", ) registration = _complete_registration(service, applicant) for prepared in (revoked, expired): with self.assertRaises(ValidationError): service.claim_prepared_account( applicant, registration.session.registration_id, prepared_account_id=prepared.prepared_account_id, correlation_id=f"corr-claim-{prepared.prepared_account_id}", ) self.assertEqual(store.memberships_for_user(registration.user.user_id), ()) self.assertEqual( service.audit_records()[-1].summary, "prepared account claim denied: factor mismatch or closed", ) def test_duplicate_pending_prepared_accounts_are_rejected(self): service, _ = _service() preparer = _actor(subject="tenant-admin", roles=("tenant-admin",)) _bootstrap_catalog(service, preparer) _prepare_demo_account(service, preparer) with self.assertRaises(ConflictError): _prepare_demo_account(service, preparer) def test_weak_factor_requirements_are_rejected(self): service, _ = _service() preparer = _actor(subject="tenant-admin", roles=("tenant-admin",)) with self.assertRaises(ValidationError): service.prepare_account( preparer, tenant="tenant:coulomb", required_factor_matches=( PreparedFactorRequirement( factor_type=IdentityFactorType.EMAIL, normalized_value=" ", ), ), entitlements=(_membership_entitlement(),), correlation_id="corr-weak-factor", ) def test_revoke_and_list_prepared_accounts(self): service, _ = _service() preparer = _actor(subject="tenant-admin", roles=("tenant-admin",)) prepared = _prepare_demo_account(service, preparer) listed = service.list_prepared_accounts( preparer, tenant="tenant:coulomb", correlation_id="corr-list", ) revoked = service.revoke_prepared_account( preparer, prepared.prepared_account_id, correlation_id="corr-revoke", ) self.assertEqual(listed[0].prepared_account_id, prepared.prepared_account_id) self.assertEqual(revoked.status, PreparedAccountStatus.REVOKED) def _service(): store = InMemoryUserEngineStore() service = UserEngineService( store=store, identity_adapter=FixtureIdentityClaimsAdapter(), authorization=LocalAuthorizationCheckPort(), ) return service, store def _actor( *, subject: str = "sample-user", roles: tuple[str, ...] = ("user",), email: str = "sample.user@example.test", ): claims = human_actor_claims(subject=subject, tenant="tenant:coulomb") claims["roles"] = list(roles) claims["email"] = email claims["preferred_username"] = subject return FixtureIdentityClaimsAdapter().normalize(claims) def _bootstrap_catalog(service: UserEngineService, actor): service.register_application( actor, sample_application(), binding=sample_application_binding(), correlation_id="corr-app", ) service.publish_catalog( actor, Catalog( catalog_id="demo-prepared-profile", namespace=sample_catalog().namespace, version=sample_catalog().version, owning_application_id=sample_catalog().owning_application_id, lifecycle=CatalogLifecycle.ACTIVE, attributes=sample_catalog().attributes, ), correlation_id="corr-catalog", ) def _complete_registration( service: UserEngineService, actor, *, factor_expires_at=None, ): session = service.start_registration(actor, correlation_id="corr-start") service.attach_registration_factor( actor, session.registration_id, FactorVerification( factor_type=IdentityFactorType.EMAIL, normalized_value="sample.user@example.test", display_value="sample.user@example.test", source_system="fixture-email", evidence_refs=( CanonEntityReference( concept="Evidence Source", identifier="email-proof", source_system="fixture-email", ), ), expires_at=factor_expires_at, ), correlation_id="corr-factor", ) return service.complete_registration( actor, session.registration_id, correlation_id="corr-complete", ) def _prepare_demo_account( service: UserEngineService, preparer, *, email: str = "sample.user@example.test", ): return service.prepare_account( preparer, tenant="tenant:coulomb", required_factor_matches=(_email_requirement(email=email),), entitlements=( PreparedEntitlement( kind=PreparedEntitlementKind.TENANT_ACCOUNT, tenant="tenant:coulomb", tenant_account_status=AccountStatus.ACTIVE, ), _membership_entitlement(), PreparedEntitlement( kind=PreparedEntitlementKind.PROFILE_VALUE, tenant="tenant:coulomb", attribute_key="demo.display_density", value="compact", profile_scope=ProfileScope.APPLICATION, profile_scope_id="app.demo", ), PreparedEntitlement( kind=PreparedEntitlementKind.ONBOARDING_JOURNEY, tenant="tenant:coulomb", onboarding_journey="welcome-demo", ), ), display_name="Prepared User", primary_email=email, correlation_id="corr-prepare", ) def _email_requirement( *, email: str = "sample.user@example.test", ) -> PreparedFactorRequirement: return PreparedFactorRequirement( factor_type=IdentityFactorType.EMAIL, normalized_value=email, source_system="fixture-email", ) def _membership_entitlement() -> PreparedEntitlement: return PreparedEntitlement( kind=PreparedEntitlementKind.MEMBERSHIP, tenant="tenant:coulomb", scope_type="team", scope_id="team:demo", role="member", ) if __name__ == "__main__": unittest.main()