generated from coulomb/repo-seed
160 lines
5.6 KiB
Python
160 lines
5.6 KiB
Python
import unittest
|
|
|
|
from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort
|
|
from user_engine.domain import AccountStatus, ProjectionType
|
|
from user_engine.errors import AuthorizationDenied
|
|
from user_engine.service import UserEngineService
|
|
from user_engine.testing.fixtures import (
|
|
FixtureIdentityClaimsAdapter,
|
|
human_actor_claims,
|
|
sample_application,
|
|
sample_application_binding,
|
|
sample_catalog,
|
|
)
|
|
from user_engine.testing.scenarios import service_claims
|
|
|
|
|
|
class IdentityCanonAlignmentTests(unittest.TestCase):
|
|
def test_identity_context_separates_user_account_subject_and_principal(self):
|
|
service, _, _ = _service()
|
|
session = _bootstrap(service, _claims(subject="ada-admin", tenant="tenant:acme"))
|
|
service.add_membership(
|
|
session.actor,
|
|
session.user.user_id,
|
|
tenant="tenant:acme",
|
|
scope_type="team",
|
|
scope_id="team:platform",
|
|
kind="tenant-admin",
|
|
correlation_id="corr-acme-admin",
|
|
)
|
|
|
|
context = service.identity_context(
|
|
session.actor,
|
|
application_id="app.demo",
|
|
include_profile=True,
|
|
correlation_id="corr-context",
|
|
)
|
|
|
|
self.assertEqual(context.entity_refs["user"].concept, "User")
|
|
self.assertEqual(context.entity_refs["account"].concept, "Account")
|
|
self.assertEqual(
|
|
context.entity_refs["authenticated_subject"].concept,
|
|
"Authenticated Subject",
|
|
)
|
|
self.assertEqual(
|
|
context.entity_refs["authorization_principal"].concept,
|
|
"Authorization Principal",
|
|
)
|
|
self.assertNotEqual(
|
|
context.entity_refs["account"].identifier,
|
|
context.entity_refs["authenticated_subject"].identifier,
|
|
)
|
|
self.assertEqual(context.entity_refs["team:team:platform"].concept, "Team")
|
|
self.assertEqual(context.profile.values["demo.display_density"], "comfortable")
|
|
self.assertFalse(context.gaps)
|
|
|
|
relationship_types = {
|
|
relationship.relationship_type for relationship in context.relationship_refs
|
|
}
|
|
self.assertIn("authenticates_as", relationship_types)
|
|
self.assertIn("evaluated_as", relationship_types)
|
|
self.assertIn("member_of", relationship_types)
|
|
self.assertIn("scoped_to", relationship_types)
|
|
self.assertEqual(context.grant_like_refs[0].concept, "Access Grant")
|
|
self.assertEqual(context.grant_like_refs[0].local_type, "tenant-admin")
|
|
self.assertTrue(context.evidence_refs)
|
|
|
|
def test_identity_context_handles_service_account_projection(self):
|
|
service, _, _ = _service()
|
|
session = _bootstrap(service, service_claims())
|
|
|
|
context = service.identity_context(
|
|
session.actor,
|
|
correlation_id="corr-service-context",
|
|
)
|
|
|
|
self.assertEqual(context.entity_refs["actor"].local_type, "service")
|
|
self.assertEqual(context.entity_refs["account"].concept, "Account")
|
|
self.assertEqual(
|
|
context.entity_refs["authorization_principal"].local_type,
|
|
"service",
|
|
)
|
|
|
|
def test_small_saas_tenant_context_denies_globex_without_scope(self):
|
|
service, _, _ = _service()
|
|
session = _bootstrap(service, _claims(subject="ada-admin", tenant="tenant:acme"))
|
|
service.set_tenant_account_status(
|
|
session.actor,
|
|
session.user.user_id,
|
|
AccountStatus.ACTIVE,
|
|
tenant="tenant:acme",
|
|
correlation_id="corr-acme-account",
|
|
)
|
|
|
|
with self.assertRaises(AuthorizationDenied):
|
|
service.identity_context(
|
|
session.actor,
|
|
tenant="tenant:globex",
|
|
correlation_id="corr-globex-context",
|
|
)
|
|
|
|
def test_identity_context_can_be_read_as_claims_enrichment_input(self):
|
|
service, _, authz = _service()
|
|
session = _bootstrap(service, _claims(subject="claim-user", tenant="tenant:acme"))
|
|
|
|
projection = service.projection(
|
|
session.actor,
|
|
session.user.user_id,
|
|
ProjectionType.CLAIMS_ENRICHMENT,
|
|
application_id="app.demo",
|
|
tenant="tenant:acme",
|
|
correlation_id="corr-claims",
|
|
)
|
|
context = service.identity_context(
|
|
session.actor,
|
|
application_id="app.demo",
|
|
include_profile=True,
|
|
correlation_id="corr-claims-context",
|
|
)
|
|
|
|
self.assertEqual(projection.values["demo.display_density"], "comfortable")
|
|
self.assertEqual(context.application_id, "app.demo")
|
|
self.assertIn("identity_context.read", [request.action for request in authz.requests])
|
|
|
|
|
|
def _service():
|
|
store = InMemoryUserEngineStore()
|
|
authz = LocalAuthorizationCheckPort()
|
|
service = UserEngineService(
|
|
store=store,
|
|
identity_adapter=FixtureIdentityClaimsAdapter(),
|
|
authorization=authz,
|
|
)
|
|
return service, store, authz
|
|
|
|
|
|
def _bootstrap(service: UserEngineService, claims: dict[str, object]):
|
|
session = service.me(claims, correlation_id="corr-me")
|
|
service.register_application(
|
|
session.actor,
|
|
sample_application(),
|
|
binding=sample_application_binding(),
|
|
correlation_id="corr-app",
|
|
)
|
|
service.publish_catalog(
|
|
session.actor,
|
|
sample_catalog(),
|
|
correlation_id="corr-catalog",
|
|
)
|
|
return session
|
|
|
|
|
|
def _claims(*, subject: str, tenant: str) -> dict[str, object]:
|
|
claims = human_actor_claims(subject=subject, tenant=tenant)
|
|
claims["roles"] = ["tenant-admin"]
|
|
return claims
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|