Implement identity canon alignment

This commit is contained in:
2026-06-05 16:04:43 +02:00
parent fe446711de
commit c6d211f472
15 changed files with 1008 additions and 21 deletions

View File

@@ -0,0 +1,159 @@
import unittest
from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort
from user_engine.domain import AccountStatus, ProjectionType
from user_engine.errors import AuthorizationDenied
from user_engine.service import UserEngineService
from user_engine.testing.fixtures import (
FixtureIdentityClaimsAdapter,
human_actor_claims,
sample_application,
sample_application_binding,
sample_catalog,
)
from user_engine.testing.scenarios import service_claims
class IdentityCanonAlignmentTests(unittest.TestCase):
def test_identity_context_separates_user_account_subject_and_principal(self):
service, _, _ = _service()
session = _bootstrap(service, _claims(subject="ada-admin", tenant="tenant:acme"))
service.add_membership(
session.actor,
session.user.user_id,
tenant="tenant:acme",
scope_type="team",
scope_id="team:platform",
kind="tenant-admin",
correlation_id="corr-acme-admin",
)
context = service.identity_context(
session.actor,
application_id="app.demo",
include_profile=True,
correlation_id="corr-context",
)
self.assertEqual(context.entity_refs["user"].concept, "User")
self.assertEqual(context.entity_refs["account"].concept, "Account")
self.assertEqual(
context.entity_refs["authenticated_subject"].concept,
"Authenticated Subject",
)
self.assertEqual(
context.entity_refs["authorization_principal"].concept,
"Authorization Principal",
)
self.assertNotEqual(
context.entity_refs["account"].identifier,
context.entity_refs["authenticated_subject"].identifier,
)
self.assertEqual(context.entity_refs["team:team:platform"].concept, "Team")
self.assertEqual(context.profile.values["demo.display_density"], "comfortable")
self.assertFalse(context.gaps)
relationship_types = {
relationship.relationship_type for relationship in context.relationship_refs
}
self.assertIn("authenticates_as", relationship_types)
self.assertIn("evaluated_as", relationship_types)
self.assertIn("member_of", relationship_types)
self.assertIn("scoped_to", relationship_types)
self.assertEqual(context.grant_like_refs[0].concept, "Access Grant")
self.assertEqual(context.grant_like_refs[0].local_type, "tenant-admin")
self.assertTrue(context.evidence_refs)
def test_identity_context_handles_service_account_projection(self):
service, _, _ = _service()
session = _bootstrap(service, service_claims())
context = service.identity_context(
session.actor,
correlation_id="corr-service-context",
)
self.assertEqual(context.entity_refs["actor"].local_type, "service")
self.assertEqual(context.entity_refs["account"].concept, "Account")
self.assertEqual(
context.entity_refs["authorization_principal"].local_type,
"service",
)
def test_small_saas_tenant_context_denies_globex_without_scope(self):
service, _, _ = _service()
session = _bootstrap(service, _claims(subject="ada-admin", tenant="tenant:acme"))
service.set_tenant_account_status(
session.actor,
session.user.user_id,
AccountStatus.ACTIVE,
tenant="tenant:acme",
correlation_id="corr-acme-account",
)
with self.assertRaises(AuthorizationDenied):
service.identity_context(
session.actor,
tenant="tenant:globex",
correlation_id="corr-globex-context",
)
def test_identity_context_can_be_read_as_claims_enrichment_input(self):
service, _, authz = _service()
session = _bootstrap(service, _claims(subject="claim-user", tenant="tenant:acme"))
projection = service.projection(
session.actor,
session.user.user_id,
ProjectionType.CLAIMS_ENRICHMENT,
application_id="app.demo",
tenant="tenant:acme",
correlation_id="corr-claims",
)
context = service.identity_context(
session.actor,
application_id="app.demo",
include_profile=True,
correlation_id="corr-claims-context",
)
self.assertEqual(projection.values["demo.display_density"], "comfortable")
self.assertEqual(context.application_id, "app.demo")
self.assertIn("identity_context.read", [request.action for request in authz.requests])
def _service():
store = InMemoryUserEngineStore()
authz = LocalAuthorizationCheckPort()
service = UserEngineService(
store=store,
identity_adapter=FixtureIdentityClaimsAdapter(),
authorization=authz,
)
return service, store, authz
def _bootstrap(service: UserEngineService, claims: dict[str, object]):
session = service.me(claims, correlation_id="corr-me")
service.register_application(
session.actor,
sample_application(),
binding=sample_application_binding(),
correlation_id="corr-app",
)
service.publish_catalog(
session.actor,
sample_catalog(),
correlation_id="corr-catalog",
)
return session
def _claims(*, subject: str, tenant: str) -> dict[str, object]:
claims = human_actor_claims(subject=subject, tenant=tenant)
claims["roles"] = ["tenant-admin"]
return claims
if __name__ == "__main__":
unittest.main()