generated from coulomb/repo-seed
421 lines
15 KiB
Python
421 lines
15 KiB
Python
from datetime import timedelta
|
|
import unittest
|
|
|
|
from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort
|
|
from user_engine.domain import (
|
|
AccountStatus,
|
|
CanonEntityReference,
|
|
Catalog,
|
|
CatalogLifecycle,
|
|
FactorVerification,
|
|
IdentityFactorType,
|
|
PreparedAccount,
|
|
PreparedAccountStatus,
|
|
PreparedEntitlement,
|
|
PreparedEntitlementKind,
|
|
PreparedFactorRequirement,
|
|
ProfileScope,
|
|
utc_now,
|
|
)
|
|
from user_engine.errors import AuthorizationDenied, ConflictError, ValidationError
|
|
from user_engine.service import UserEngineService
|
|
from user_engine.testing.fixtures import (
|
|
FixtureIdentityClaimsAdapter,
|
|
human_actor_claims,
|
|
sample_application,
|
|
sample_application_binding,
|
|
sample_catalog,
|
|
)
|
|
|
|
|
|
class PreparedAccountTests(unittest.TestCase):
|
|
def test_claim_prepared_account_activates_entitlements(self):
|
|
service, store = _service()
|
|
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
|
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
|
_bootstrap_catalog(service, preparer)
|
|
prepared = _prepare_demo_account(service, preparer)
|
|
registration = _complete_registration(service, applicant)
|
|
|
|
claim = service.claim_prepared_account(
|
|
applicant,
|
|
registration.session.registration_id,
|
|
correlation_id="corr-claim",
|
|
)
|
|
|
|
self.assertEqual(claim.prepared_account.status, PreparedAccountStatus.CLAIMED)
|
|
self.assertEqual(claim.prepared_account.claimed_by_user_id, claim.user.user_id)
|
|
self.assertEqual(claim.tenant_accounts[0].status, AccountStatus.ACTIVE)
|
|
self.assertEqual(claim.memberships[0].scope_id, "team:demo")
|
|
self.assertEqual(claim.memberships[0].kind, "member")
|
|
self.assertEqual(claim.profile_values[0].attribute_key, "demo.display_density")
|
|
self.assertEqual(claim.profile_values[0].value, "compact")
|
|
self.assertEqual(claim.onboarding_journeys, ("welcome-demo",))
|
|
self.assertEqual(
|
|
store.prepared_account(prepared.prepared_account_id).status,
|
|
PreparedAccountStatus.CLAIMED,
|
|
)
|
|
self.assertIn(
|
|
"prepared_account.onboarding_requested",
|
|
[event.event_type for event in service.outbox_events()],
|
|
)
|
|
self.assertNotIn(
|
|
"sample.user@example.test",
|
|
repr([event.payload for event in service.outbox_events()]),
|
|
)
|
|
|
|
def test_claim_requires_matching_verified_factor(self):
|
|
service, store = _service()
|
|
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
|
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
|
_bootstrap_catalog(service, preparer)
|
|
prepared = _prepare_demo_account(
|
|
service,
|
|
preparer,
|
|
email="different@example.test",
|
|
)
|
|
registration = _complete_registration(service, applicant)
|
|
before_outbox = len(service.outbox_events())
|
|
|
|
with self.assertRaises(ValidationError):
|
|
service.claim_prepared_account(
|
|
applicant,
|
|
registration.session.registration_id,
|
|
prepared_account_id=prepared.prepared_account_id,
|
|
correlation_id="corr-claim-mismatch",
|
|
)
|
|
|
|
self.assertEqual(
|
|
store.prepared_account(prepared.prepared_account_id).status,
|
|
PreparedAccountStatus.PENDING,
|
|
)
|
|
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
|
|
self.assertEqual(len(service.outbox_events()), before_outbox)
|
|
self.assertEqual(
|
|
service.audit_records()[-1].summary,
|
|
"prepared account claim denied: factor mismatch or closed",
|
|
)
|
|
|
|
def test_claim_ignores_expired_factor_evidence(self):
|
|
service, store = _service()
|
|
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
|
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
|
_bootstrap_catalog(service, preparer)
|
|
prepared = _prepare_demo_account(service, preparer)
|
|
registration = _complete_registration(
|
|
service,
|
|
applicant,
|
|
factor_expires_at=utc_now() - timedelta(days=1),
|
|
)
|
|
|
|
with self.assertRaises(ValidationError):
|
|
service.claim_prepared_account(
|
|
applicant,
|
|
registration.session.registration_id,
|
|
prepared_account_id=prepared.prepared_account_id,
|
|
correlation_id="corr-claim-expired-factor",
|
|
)
|
|
|
|
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
|
|
self.assertEqual(
|
|
service.audit_records()[-1].summary,
|
|
"prepared account claim denied: factor mismatch or closed",
|
|
)
|
|
|
|
def test_ambiguous_prepared_account_matches_fail_closed(self):
|
|
service, store = _service()
|
|
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
|
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
|
_bootstrap_catalog(service, preparer)
|
|
requirement = _email_requirement()
|
|
entitlements = (_membership_entitlement(),)
|
|
store.save_prepared_account(
|
|
PreparedAccount(
|
|
tenant="tenant:coulomb",
|
|
required_factor_matches=(requirement,),
|
|
entitlements=entitlements,
|
|
prepared_by_subject="fixture",
|
|
)
|
|
)
|
|
store.save_prepared_account(
|
|
PreparedAccount(
|
|
tenant="tenant:coulomb",
|
|
required_factor_matches=(requirement,),
|
|
entitlements=entitlements,
|
|
prepared_by_subject="fixture",
|
|
)
|
|
)
|
|
registration = _complete_registration(service, applicant)
|
|
|
|
with self.assertRaises(ConflictError):
|
|
service.claim_prepared_account(
|
|
applicant,
|
|
registration.session.registration_id,
|
|
correlation_id="corr-ambiguous",
|
|
)
|
|
|
|
self.assertEqual(
|
|
service.audit_records()[-1].summary,
|
|
"prepared account claim denied: ambiguous match",
|
|
)
|
|
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
|
|
|
|
def test_approval_required_entitlement_blocks_claim(self):
|
|
service, store = _service()
|
|
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
|
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
|
prepared = service.prepare_account(
|
|
preparer,
|
|
tenant="tenant:coulomb",
|
|
required_factor_matches=(_email_requirement(),),
|
|
entitlements=(
|
|
PreparedEntitlement(
|
|
kind=PreparedEntitlementKind.MEMBERSHIP,
|
|
tenant="tenant:coulomb",
|
|
scope_type="team",
|
|
scope_id="team:ops",
|
|
role="admin",
|
|
requires_approval=True,
|
|
),
|
|
),
|
|
correlation_id="corr-prepare-privileged",
|
|
)
|
|
registration = _complete_registration(service, applicant)
|
|
|
|
with self.assertRaises(AuthorizationDenied):
|
|
service.claim_prepared_account(
|
|
applicant,
|
|
registration.session.registration_id,
|
|
prepared_account_id=prepared.prepared_account_id,
|
|
correlation_id="corr-claim-privileged",
|
|
)
|
|
|
|
self.assertEqual(
|
|
store.prepared_account(prepared.prepared_account_id).status,
|
|
PreparedAccountStatus.PENDING,
|
|
)
|
|
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
|
|
self.assertEqual(
|
|
service.audit_records()[-1].summary,
|
|
"prepared account claim denied: approval required",
|
|
)
|
|
|
|
def test_revoked_and_expired_prepared_accounts_cannot_be_claimed(self):
|
|
service, store = _service()
|
|
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
|
applicant = _actor(subject="new-user", email="sample.user@example.test")
|
|
_bootstrap_catalog(service, preparer)
|
|
revoked = _prepare_demo_account(service, preparer)
|
|
service.revoke_prepared_account(
|
|
preparer,
|
|
revoked.prepared_account_id,
|
|
correlation_id="corr-revoke",
|
|
)
|
|
expired = _prepare_demo_account(service, preparer)
|
|
service.expire_prepared_account(
|
|
preparer,
|
|
expired.prepared_account_id,
|
|
correlation_id="corr-expire",
|
|
)
|
|
registration = _complete_registration(service, applicant)
|
|
|
|
for prepared in (revoked, expired):
|
|
with self.assertRaises(ValidationError):
|
|
service.claim_prepared_account(
|
|
applicant,
|
|
registration.session.registration_id,
|
|
prepared_account_id=prepared.prepared_account_id,
|
|
correlation_id=f"corr-claim-{prepared.prepared_account_id}",
|
|
)
|
|
|
|
self.assertEqual(store.memberships_for_user(registration.user.user_id), ())
|
|
self.assertEqual(
|
|
service.audit_records()[-1].summary,
|
|
"prepared account claim denied: factor mismatch or closed",
|
|
)
|
|
|
|
def test_duplicate_pending_prepared_accounts_are_rejected(self):
|
|
service, _ = _service()
|
|
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
|
_bootstrap_catalog(service, preparer)
|
|
_prepare_demo_account(service, preparer)
|
|
|
|
with self.assertRaises(ConflictError):
|
|
_prepare_demo_account(service, preparer)
|
|
|
|
def test_weak_factor_requirements_are_rejected(self):
|
|
service, _ = _service()
|
|
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
|
|
|
with self.assertRaises(ValidationError):
|
|
service.prepare_account(
|
|
preparer,
|
|
tenant="tenant:coulomb",
|
|
required_factor_matches=(
|
|
PreparedFactorRequirement(
|
|
factor_type=IdentityFactorType.EMAIL,
|
|
normalized_value=" ",
|
|
),
|
|
),
|
|
entitlements=(_membership_entitlement(),),
|
|
correlation_id="corr-weak-factor",
|
|
)
|
|
|
|
def test_revoke_and_list_prepared_accounts(self):
|
|
service, _ = _service()
|
|
preparer = _actor(subject="tenant-admin", roles=("tenant-admin",))
|
|
prepared = _prepare_demo_account(service, preparer)
|
|
|
|
listed = service.list_prepared_accounts(
|
|
preparer,
|
|
tenant="tenant:coulomb",
|
|
correlation_id="corr-list",
|
|
)
|
|
revoked = service.revoke_prepared_account(
|
|
preparer,
|
|
prepared.prepared_account_id,
|
|
correlation_id="corr-revoke",
|
|
)
|
|
|
|
self.assertEqual(listed[0].prepared_account_id, prepared.prepared_account_id)
|
|
self.assertEqual(revoked.status, PreparedAccountStatus.REVOKED)
|
|
|
|
|
|
def _service():
|
|
store = InMemoryUserEngineStore()
|
|
service = UserEngineService(
|
|
store=store,
|
|
identity_adapter=FixtureIdentityClaimsAdapter(),
|
|
authorization=LocalAuthorizationCheckPort(),
|
|
)
|
|
return service, store
|
|
|
|
|
|
def _actor(
|
|
*,
|
|
subject: str = "sample-user",
|
|
roles: tuple[str, ...] = ("user",),
|
|
email: str = "sample.user@example.test",
|
|
):
|
|
claims = human_actor_claims(subject=subject, tenant="tenant:coulomb")
|
|
claims["roles"] = list(roles)
|
|
claims["email"] = email
|
|
claims["preferred_username"] = subject
|
|
return FixtureIdentityClaimsAdapter().normalize(claims)
|
|
|
|
|
|
def _bootstrap_catalog(service: UserEngineService, actor):
|
|
service.register_application(
|
|
actor,
|
|
sample_application(),
|
|
binding=sample_application_binding(),
|
|
correlation_id="corr-app",
|
|
)
|
|
service.publish_catalog(
|
|
actor,
|
|
Catalog(
|
|
catalog_id="demo-prepared-profile",
|
|
namespace=sample_catalog().namespace,
|
|
version=sample_catalog().version,
|
|
owning_application_id=sample_catalog().owning_application_id,
|
|
lifecycle=CatalogLifecycle.ACTIVE,
|
|
attributes=sample_catalog().attributes,
|
|
),
|
|
correlation_id="corr-catalog",
|
|
)
|
|
|
|
|
|
def _complete_registration(
|
|
service: UserEngineService,
|
|
actor,
|
|
*,
|
|
factor_expires_at=None,
|
|
):
|
|
session = service.start_registration(actor, correlation_id="corr-start")
|
|
service.attach_registration_factor(
|
|
actor,
|
|
session.registration_id,
|
|
FactorVerification(
|
|
factor_type=IdentityFactorType.EMAIL,
|
|
normalized_value="sample.user@example.test",
|
|
display_value="sample.user@example.test",
|
|
source_system="fixture-email",
|
|
evidence_refs=(
|
|
CanonEntityReference(
|
|
concept="Evidence Source",
|
|
identifier="email-proof",
|
|
source_system="fixture-email",
|
|
),
|
|
),
|
|
expires_at=factor_expires_at,
|
|
),
|
|
correlation_id="corr-factor",
|
|
)
|
|
return service.complete_registration(
|
|
actor,
|
|
session.registration_id,
|
|
correlation_id="corr-complete",
|
|
)
|
|
|
|
|
|
def _prepare_demo_account(
|
|
service: UserEngineService,
|
|
preparer,
|
|
*,
|
|
email: str = "sample.user@example.test",
|
|
):
|
|
return service.prepare_account(
|
|
preparer,
|
|
tenant="tenant:coulomb",
|
|
required_factor_matches=(_email_requirement(email=email),),
|
|
entitlements=(
|
|
PreparedEntitlement(
|
|
kind=PreparedEntitlementKind.TENANT_ACCOUNT,
|
|
tenant="tenant:coulomb",
|
|
tenant_account_status=AccountStatus.ACTIVE,
|
|
),
|
|
_membership_entitlement(),
|
|
PreparedEntitlement(
|
|
kind=PreparedEntitlementKind.PROFILE_VALUE,
|
|
tenant="tenant:coulomb",
|
|
attribute_key="demo.display_density",
|
|
value="compact",
|
|
profile_scope=ProfileScope.APPLICATION,
|
|
profile_scope_id="app.demo",
|
|
),
|
|
PreparedEntitlement(
|
|
kind=PreparedEntitlementKind.ONBOARDING_JOURNEY,
|
|
tenant="tenant:coulomb",
|
|
onboarding_journey="welcome-demo",
|
|
),
|
|
),
|
|
display_name="Prepared User",
|
|
primary_email=email,
|
|
correlation_id="corr-prepare",
|
|
)
|
|
|
|
|
|
def _email_requirement(
|
|
*,
|
|
email: str = "sample.user@example.test",
|
|
) -> PreparedFactorRequirement:
|
|
return PreparedFactorRequirement(
|
|
factor_type=IdentityFactorType.EMAIL,
|
|
normalized_value=email,
|
|
source_system="fixture-email",
|
|
)
|
|
|
|
|
|
def _membership_entitlement() -> PreparedEntitlement:
|
|
return PreparedEntitlement(
|
|
kind=PreparedEntitlementKind.MEMBERSHIP,
|
|
tenant="tenant:coulomb",
|
|
scope_type="team",
|
|
scope_id="team:demo",
|
|
role="member",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|