import unittest from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort from user_engine.domain import AccountStatus, ProjectionType from user_engine.errors import AuthorizationDenied from user_engine.service import UserEngineService from user_engine.testing.fixtures import ( FixtureIdentityClaimsAdapter, human_actor_claims, sample_application, sample_application_binding, sample_catalog, ) from user_engine.testing.scenarios import service_claims class IdentityCanonAlignmentTests(unittest.TestCase): def test_identity_context_separates_user_account_subject_and_principal(self): service, _, _ = _service() session = _bootstrap(service, _claims(subject="ada-admin", tenant="tenant:acme")) service.add_membership( session.actor, session.user.user_id, tenant="tenant:acme", scope_type="team", scope_id="team:platform", kind="tenant-admin", correlation_id="corr-acme-admin", ) context = service.identity_context( session.actor, application_id="app.demo", include_profile=True, correlation_id="corr-context", ) self.assertEqual(context.entity_refs["user"].concept, "User") self.assertEqual(context.entity_refs["account"].concept, "Account") self.assertEqual( context.entity_refs["authenticated_subject"].concept, "Authenticated Subject", ) self.assertEqual( context.entity_refs["authorization_principal"].concept, "Authorization Principal", ) self.assertNotEqual( context.entity_refs["account"].identifier, context.entity_refs["authenticated_subject"].identifier, ) self.assertEqual(context.entity_refs["team:team:platform"].concept, "Team") self.assertEqual(context.profile.values["demo.display_density"], "comfortable") self.assertFalse(context.gaps) relationship_types = { relationship.relationship_type for relationship in context.relationship_refs } self.assertIn("authenticates_as", relationship_types) self.assertIn("evaluated_as", relationship_types) self.assertIn("member_of", relationship_types) self.assertIn("scoped_to", relationship_types) self.assertEqual(context.grant_like_refs[0].concept, "Access Grant") self.assertEqual(context.grant_like_refs[0].local_type, "tenant-admin") self.assertTrue(context.evidence_refs) def test_identity_context_handles_service_account_projection(self): service, _, _ = _service() session = _bootstrap(service, service_claims()) context = service.identity_context( session.actor, correlation_id="corr-service-context", ) self.assertEqual(context.entity_refs["actor"].local_type, "service") self.assertEqual(context.entity_refs["account"].concept, "Account") self.assertEqual( context.entity_refs["authorization_principal"].local_type, "service", ) def test_small_saas_tenant_context_denies_globex_without_scope(self): service, _, _ = _service() session = _bootstrap(service, _claims(subject="ada-admin", tenant="tenant:acme")) service.set_tenant_account_status( session.actor, session.user.user_id, AccountStatus.ACTIVE, tenant="tenant:acme", correlation_id="corr-acme-account", ) with self.assertRaises(AuthorizationDenied): service.identity_context( session.actor, tenant="tenant:globex", correlation_id="corr-globex-context", ) def test_identity_context_can_be_read_as_claims_enrichment_input(self): service, _, authz = _service() session = _bootstrap(service, _claims(subject="claim-user", tenant="tenant:acme")) projection = service.projection( session.actor, session.user.user_id, ProjectionType.CLAIMS_ENRICHMENT, application_id="app.demo", tenant="tenant:acme", correlation_id="corr-claims", ) context = service.identity_context( session.actor, application_id="app.demo", include_profile=True, correlation_id="corr-claims-context", ) self.assertEqual(projection.values["demo.display_density"], "comfortable") self.assertEqual(context.application_id, "app.demo") self.assertIn("identity_context.read", [request.action for request in authz.requests]) def _service(): store = InMemoryUserEngineStore() authz = LocalAuthorizationCheckPort() service = UserEngineService( store=store, identity_adapter=FixtureIdentityClaimsAdapter(), authorization=authz, ) return service, store, authz def _bootstrap(service: UserEngineService, claims: dict[str, object]): session = service.me(claims, correlation_id="corr-me") service.register_application( session.actor, sample_application(), binding=sample_application_binding(), correlation_id="corr-app", ) service.publish_catalog( session.actor, sample_catalog(), correlation_id="corr-catalog", ) return session def _claims(*, subject: str, tenant: str) -> dict[str, object]: claims = human_actor_claims(subject=subject, tenant=tenant) claims["roles"] = ["tenant-admin"] return claims if __name__ == "__main__": unittest.main()