import unittest from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort from user_engine.domain import ( AccessMembershipRequirement, AccessProfile, AccessScopeType, IdentityFactor, IdentityFactorType, ProjectionType, ) from user_engine.errors import AuthorizationDenied, ValidationError from user_engine.service import UserEngineService from user_engine.testing.fixtures import ( FixtureIdentityClaimsAdapter, human_actor_claims, sample_application, sample_application_binding, sample_catalog, ) class AccessProfileTests(unittest.TestCase): def test_select_active_hat_updates_identity_context_and_projection(self): service, store, _ = _service() session = _bootstrap(service) service.add_membership( session.actor, session.user.user_id, tenant="tenant:coulomb", scope_type="realm", scope_id="realm:citadel", kind="operator", correlation_id="corr-realm-membership", ) store.save_identity_factor(_email_factor(session.user.user_id)) profile = service.register_access_profile( session.actor, _realm_operator_profile(), correlation_id="corr-profile-register", ) selection = service.select_active_hat( session.actor, session.user.user_id, profile.access_profile_id, correlation_id="corr-select-hat", ) context = service.identity_context( session.actor, application_id="app.demo", include_profile=True, correlation_id="corr-identity-context", ) projection = service.projection( session.actor, session.user.user_id, ProjectionType.CLAIMS_ENRICHMENT, application_id="app.demo", tenant="tenant:coulomb", correlation_id="corr-projection", ) self.assertEqual(selection.active_context.hat, "operator") self.assertEqual(selection.active_context.realm_id, "realm:citadel") self.assertEqual(selection.active_context.service_id, "app.demo") self.assertEqual(selection.active_context.asset_id, "asset:ledger") self.assertEqual(selection.active_context.factor_ids[0].startswith("fac_"), True) self.assertEqual(context.active_access_context.hat, "operator") self.assertEqual(context.entity_refs["active_hat"].concept, "Hat") self.assertIn( "wears_hat", {relationship.relationship_type for relationship in context.relationship_refs}, ) self.assertTrue( any(fact.scope_id == "realm:citadel" for fact in context.access_control_facts) ) self.assertEqual(projection.access_context["active_hat"], "operator") self.assertEqual(projection.access_context["claims"]["service_role"], "operator") self.assertNotIn( "ada@example.test", repr([event.payload for event in service.outbox_events()]), ) def test_cross_tenant_access_profile_denied(self): service, _, _ = _service() session = _bootstrap(service) with self.assertRaises(AuthorizationDenied): service.register_access_profile( session.actor, AccessProfile( tenant="tenant:faraday", display_name="Faraday Operator", hat="operator", membership_requirements=( AccessMembershipRequirement( scope_type="realm", scope_id="realm:faraday", kind="operator", ), ), ), correlation_id="corr-cross-tenant", ) def test_missing_factor_assurance_fails_closed(self): service, store, _ = _service() session = _bootstrap(service) service.add_membership( session.actor, session.user.user_id, tenant="tenant:coulomb", scope_type="realm", scope_id="realm:citadel", kind="operator", correlation_id="corr-realm-membership", ) profile = service.register_access_profile( session.actor, AccessProfile( tenant="tenant:coulomb", display_name="High Assurance Operator", hat="operator", scope_type=AccessScopeType.REALM, scope_id="realm:citadel", realm_id="realm:citadel", membership_requirements=( AccessMembershipRequirement( scope_type="realm", scope_id="realm:citadel", kind="operator", ), ), required_factor_types=(IdentityFactorType.EID,), ), correlation_id="corr-high-assurance", ) with self.assertRaises(ValidationError): service.select_active_hat( session.actor, session.user.user_id, profile.access_profile_id, correlation_id="corr-select-missing-factor", ) self.assertIsNone( store.active_access_context(session.user.user_id, "tenant:coulomb") ) def test_group_derived_access_exports_group_facts(self): service, _, _ = _service() session = _bootstrap(service) service.add_membership( session.actor, session.user.user_id, tenant="tenant:coulomb", scope_type="group", scope_id="group:research", kind="member", correlation_id="corr-group-membership", ) profile = service.register_access_profile( session.actor, AccessProfile( tenant="tenant:coulomb", display_name="Research Service Hat", hat="researcher", scope_type=AccessScopeType.SERVICE, scope_id="app.demo", service_id="app.demo", membership_requirements=( AccessMembershipRequirement( scope_type="group", scope_id="group:research", kind="member", ), ), group_scope_ids=("group:research",), ), correlation_id="corr-research-profile", ) service.select_active_hat( session.actor, session.user.user_id, profile.access_profile_id, correlation_id="corr-select-researcher", ) export = service.export_access_control_facts( session.actor, tenant="tenant:coulomb", user_id=session.user.user_id, correlation_id="corr-export-facts", ) self.assertIn("group", export.manifest["subject_types"]) self.assertTrue( any( fact.subject_type == "group" and fact.subject_id == "group:research" and fact.scope_id == "app.demo" for fact in export.facts ) ) def test_service_specific_projection_filters_other_services(self): service, store, _ = _service() session = _bootstrap(service) service.add_membership( session.actor, session.user.user_id, tenant="tenant:coulomb", scope_type="realm", scope_id="realm:citadel", kind="operator", correlation_id="corr-realm-membership", ) store.save_identity_factor(_email_factor(session.user.user_id)) profile = service.register_access_profile( session.actor, _realm_operator_profile(), correlation_id="corr-profile-register", ) service.select_active_hat( session.actor, session.user.user_id, profile.access_profile_id, correlation_id="corr-select-hat", ) projection = service.projection( session.actor, session.user.user_id, ProjectionType.CLAIMS_ENRICHMENT, application_id="app.other", tenant="tenant:coulomb", correlation_id="corr-other-service", ) self.assertEqual(projection.access_context, {}) def test_access_profile_diagnostics_are_redacted(self): service, _, _ = _service() session = _bootstrap(service) profile = service.register_access_profile( session.actor, AccessProfile( tenant="tenant:coulomb", display_name="Sensitive Defaults", hat="operator", membership_requirements=( AccessMembershipRequirement( scope_type="realm", scope_id="realm:citadel", kind="operator", ), ), profile_defaults={"internal_hint": "secret-default"}, claims={"private_claim": "secret-claim"}, ), correlation_id="corr-sensitive-profile", ) diagnostics = service.access_profile_diagnostics( session.actor, tenant="tenant:coulomb", correlation_id="corr-access-diagnostics", ) self.assertEqual(diagnostics.profile_count, 1) self.assertIn(profile.access_profile_id, diagnostics.required_factor_types) self.assertNotIn("secret-default", repr(diagnostics)) self.assertNotIn("secret-claim", repr(diagnostics)) self.assertNotIn( "secret-default", repr([event.payload for event in service.outbox_events()]), ) self.assertNotIn( "secret-claim", repr([event.payload for event in service.outbox_events()]), ) def _service(): store = InMemoryUserEngineStore() authz = LocalAuthorizationCheckPort() service = UserEngineService( store=store, identity_adapter=FixtureIdentityClaimsAdapter(), authorization=authz, ) return service, store, authz def _bootstrap(service: UserEngineService): session = service.me(_claims(), correlation_id="corr-me") service.register_application( session.actor, sample_application(), binding=sample_application_binding(), correlation_id="corr-app", ) service.publish_catalog( session.actor, sample_catalog(), correlation_id="corr-catalog", ) return session def _claims(): claims = human_actor_claims(subject="ada", tenant="tenant:coulomb") claims["roles"] = ["tenant-admin"] claims["email"] = "ada@example.test" return claims def _email_factor(user_id: str) -> IdentityFactor: return IdentityFactor( factor_type=IdentityFactorType.EMAIL, normalized_value="ada@example.test", user_id=user_id, display_value="ada@example.test", source_system="fixture-email", ) def _realm_operator_profile() -> AccessProfile: return AccessProfile( tenant="tenant:coulomb", display_name="Realm Operator", hat="operator", scope_type=AccessScopeType.REALM, scope_id="realm:citadel", realm_id="realm:citadel", service_id="app.demo", asset_id="asset:ledger", membership_requirements=( AccessMembershipRequirement( scope_type="realm", scope_id="realm:citadel", kind="operator", ), ), required_factor_types=(IdentityFactorType.EMAIL,), profile_defaults={"workspace_mode": "ops"}, claims={"service_role": "operator"}, ) if __name__ == "__main__": unittest.main()