Files
user-engine/tests/test_registration_identity.py

176 lines
6.3 KiB
Python

import unittest
from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort
from user_engine.domain import (
CanonEntityReference,
FactorVerification,
IdentityFactorType,
RegistrationStatus,
)
from user_engine.errors import ValidationError
from user_engine.service import UserEngineService
from user_engine.testing.fixtures import FixtureIdentityClaimsAdapter, human_actor_claims
class RegistrationIdentityTests(unittest.TestCase):
def test_registration_with_verified_email_creates_netkingdom_id(self):
service, store = _service()
actor = _actor()
session = service.start_registration(actor, correlation_id="corr-start")
verified = service.attach_registration_factor(
actor,
session.registration_id,
_verified_email(),
correlation_id="corr-factor",
)
completion = service.complete_registration(
actor,
session.registration_id,
correlation_id="corr-complete",
)
self.assertEqual(verified.status, RegistrationStatus.FACTOR_VERIFIED)
self.assertEqual(completion.session.status, RegistrationStatus.COMPLETED)
self.assertEqual(completion.netkingdom_id, completion.user.user_id)
self.assertEqual(completion.session.netkingdom_id, completion.user.user_id)
self.assertEqual(completion.user.primary_email, "sample.user@example.test")
self.assertEqual(completion.identity_context.user.user_id, completion.user.user_id)
self.assertEqual(store.find_identity(*actor.identity_key).user_id, completion.user.user_id)
self.assertEqual(
store.factors_for_user(completion.user.user_id)[0].factor_type,
IdentityFactorType.EMAIL,
)
self.assertNotIn(
"sample.user@example.test",
repr([event.payload for event in service.outbox_events()]),
)
def test_registration_requires_all_required_factors_before_completion(self):
service, store = _service()
actor = _actor()
session = service.start_registration(
actor,
required_factor_types=(IdentityFactorType.EID,),
correlation_id="corr-start",
)
service.attach_registration_factor(
actor,
session.registration_id,
_verified_email(),
correlation_id="corr-factor",
)
with self.assertRaises(ValidationError):
service.complete_registration(
actor,
session.registration_id,
correlation_id="corr-complete",
)
self.assertEqual(store.record_counts()["users"], 0)
self.assertIsNone(store.registration_session(session.registration_id).user_id)
def test_factor_verifier_adapter_normalizes_external_proofing_result(self):
service, store = _service(factor_verifier=_FixtureFactorVerifier())
actor = _actor()
session = service.start_registration(actor, correlation_id="corr-start")
updated = service.attach_registration_factor(
actor,
session.registration_id,
{
"type": "email",
"value": "Sample.User@Example.Test",
"secret_challenge": "do-not-store-this",
},
correlation_id="corr-factor",
)
stored = store.factors_for_registration(session.registration_id)[0]
self.assertEqual(updated.status, RegistrationStatus.FACTOR_VERIFIED)
self.assertEqual(stored.normalized_value, "sample.user@example.test")
self.assertNotIn("do-not-store-this", repr(stored))
self.assertNotIn(
"do-not-store-this",
repr([event.payload for event in service.outbox_events()]),
)
def test_abandoned_registration_cannot_resume_or_complete(self):
service, _ = _service()
actor = _actor()
session = service.start_registration(actor, correlation_id="corr-start")
abandoned = service.abandon_registration(
actor,
session.registration_id,
correlation_id="corr-abandon",
)
diagnostics = service.registration_diagnostics(
actor,
tenant="tenant:coulomb",
correlation_id="corr-diagnostics",
)
self.assertEqual(abandoned.status, RegistrationStatus.ABANDONED)
self.assertEqual(diagnostics.statuses[RegistrationStatus.ABANDONED.value], 1)
with self.assertRaises(ValidationError):
service.resume_registration(
actor,
session.registration_id,
correlation_id="corr-resume",
)
with self.assertRaises(ValidationError):
service.complete_registration(
actor,
session.registration_id,
correlation_id="corr-complete",
)
def _service(*, factor_verifier=None):
store = InMemoryUserEngineStore()
service = UserEngineService(
store=store,
identity_adapter=FixtureIdentityClaimsAdapter(),
authorization=LocalAuthorizationCheckPort(),
factor_verifier=factor_verifier,
)
return service, store
def _actor():
return FixtureIdentityClaimsAdapter().normalize(human_actor_claims())
def _verified_email() -> FactorVerification:
return FactorVerification(
factor_type=IdentityFactorType.EMAIL,
normalized_value="sample.user@example.test",
display_value="sample.user@example.test",
source_system="fixture-email",
assurance={"level": "email_verified"},
evidence_refs=(
CanonEntityReference(
concept="Evidence Source",
identifier="fixture-email-proof",
source_system="fixture-email",
),
),
)
class _FixtureFactorVerifier:
def normalize(self, proofing_result):
return FactorVerification(
factor_type=IdentityFactorType(str(proofing_result["type"])),
normalized_value=str(proofing_result["value"]).lower(),
display_value=str(proofing_result["value"]).lower(),
source_system="fixture-proofing",
assurance={"level": "email_verified"},
)
if __name__ == "__main__":
unittest.main()