Files
user-engine/tests/test_access_profiles.py

355 lines
12 KiB
Python

import unittest
from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort
from user_engine.domain import (
AccessMembershipRequirement,
AccessProfile,
AccessScopeType,
IdentityFactor,
IdentityFactorType,
ProjectionType,
)
from user_engine.errors import AuthorizationDenied, ValidationError
from user_engine.service import UserEngineService
from user_engine.testing.fixtures import (
FixtureIdentityClaimsAdapter,
human_actor_claims,
sample_application,
sample_application_binding,
sample_catalog,
)
class AccessProfileTests(unittest.TestCase):
def test_select_active_hat_updates_identity_context_and_projection(self):
service, store, _ = _service()
session = _bootstrap(service)
service.add_membership(
session.actor,
session.user.user_id,
tenant="tenant:coulomb",
scope_type="realm",
scope_id="realm:citadel",
kind="operator",
correlation_id="corr-realm-membership",
)
store.save_identity_factor(_email_factor(session.user.user_id))
profile = service.register_access_profile(
session.actor,
_realm_operator_profile(),
correlation_id="corr-profile-register",
)
selection = service.select_active_hat(
session.actor,
session.user.user_id,
profile.access_profile_id,
correlation_id="corr-select-hat",
)
context = service.identity_context(
session.actor,
application_id="app.demo",
include_profile=True,
correlation_id="corr-identity-context",
)
projection = service.projection(
session.actor,
session.user.user_id,
ProjectionType.CLAIMS_ENRICHMENT,
application_id="app.demo",
tenant="tenant:coulomb",
correlation_id="corr-projection",
)
self.assertEqual(selection.active_context.hat, "operator")
self.assertEqual(selection.active_context.realm_id, "realm:citadel")
self.assertEqual(selection.active_context.service_id, "app.demo")
self.assertEqual(selection.active_context.asset_id, "asset:ledger")
self.assertEqual(selection.active_context.factor_ids[0].startswith("fac_"), True)
self.assertEqual(context.active_access_context.hat, "operator")
self.assertEqual(context.entity_refs["active_hat"].concept, "Hat")
self.assertIn(
"wears_hat",
{relationship.relationship_type for relationship in context.relationship_refs},
)
self.assertTrue(
any(fact.scope_id == "realm:citadel" for fact in context.access_control_facts)
)
self.assertEqual(projection.access_context["active_hat"], "operator")
self.assertEqual(projection.access_context["claims"]["service_role"], "operator")
self.assertNotIn(
"ada@example.test",
repr([event.payload for event in service.outbox_events()]),
)
def test_cross_tenant_access_profile_denied(self):
service, _, _ = _service()
session = _bootstrap(service)
with self.assertRaises(AuthorizationDenied):
service.register_access_profile(
session.actor,
AccessProfile(
tenant="tenant:faraday",
display_name="Faraday Operator",
hat="operator",
membership_requirements=(
AccessMembershipRequirement(
scope_type="realm",
scope_id="realm:faraday",
kind="operator",
),
),
),
correlation_id="corr-cross-tenant",
)
def test_missing_factor_assurance_fails_closed(self):
service, store, _ = _service()
session = _bootstrap(service)
service.add_membership(
session.actor,
session.user.user_id,
tenant="tenant:coulomb",
scope_type="realm",
scope_id="realm:citadel",
kind="operator",
correlation_id="corr-realm-membership",
)
profile = service.register_access_profile(
session.actor,
AccessProfile(
tenant="tenant:coulomb",
display_name="High Assurance Operator",
hat="operator",
scope_type=AccessScopeType.REALM,
scope_id="realm:citadel",
realm_id="realm:citadel",
membership_requirements=(
AccessMembershipRequirement(
scope_type="realm",
scope_id="realm:citadel",
kind="operator",
),
),
required_factor_types=(IdentityFactorType.EID,),
),
correlation_id="corr-high-assurance",
)
with self.assertRaises(ValidationError):
service.select_active_hat(
session.actor,
session.user.user_id,
profile.access_profile_id,
correlation_id="corr-select-missing-factor",
)
self.assertIsNone(
store.active_access_context(session.user.user_id, "tenant:coulomb")
)
def test_group_derived_access_exports_group_facts(self):
service, _, _ = _service()
session = _bootstrap(service)
service.add_membership(
session.actor,
session.user.user_id,
tenant="tenant:coulomb",
scope_type="group",
scope_id="group:research",
kind="member",
correlation_id="corr-group-membership",
)
profile = service.register_access_profile(
session.actor,
AccessProfile(
tenant="tenant:coulomb",
display_name="Research Service Hat",
hat="researcher",
scope_type=AccessScopeType.SERVICE,
scope_id="app.demo",
service_id="app.demo",
membership_requirements=(
AccessMembershipRequirement(
scope_type="group",
scope_id="group:research",
kind="member",
),
),
group_scope_ids=("group:research",),
),
correlation_id="corr-research-profile",
)
service.select_active_hat(
session.actor,
session.user.user_id,
profile.access_profile_id,
correlation_id="corr-select-researcher",
)
export = service.export_access_control_facts(
session.actor,
tenant="tenant:coulomb",
user_id=session.user.user_id,
correlation_id="corr-export-facts",
)
self.assertIn("group", export.manifest["subject_types"])
self.assertTrue(
any(
fact.subject_type == "group"
and fact.subject_id == "group:research"
and fact.scope_id == "app.demo"
for fact in export.facts
)
)
def test_service_specific_projection_filters_other_services(self):
service, store, _ = _service()
session = _bootstrap(service)
service.add_membership(
session.actor,
session.user.user_id,
tenant="tenant:coulomb",
scope_type="realm",
scope_id="realm:citadel",
kind="operator",
correlation_id="corr-realm-membership",
)
store.save_identity_factor(_email_factor(session.user.user_id))
profile = service.register_access_profile(
session.actor,
_realm_operator_profile(),
correlation_id="corr-profile-register",
)
service.select_active_hat(
session.actor,
session.user.user_id,
profile.access_profile_id,
correlation_id="corr-select-hat",
)
projection = service.projection(
session.actor,
session.user.user_id,
ProjectionType.CLAIMS_ENRICHMENT,
application_id="app.other",
tenant="tenant:coulomb",
correlation_id="corr-other-service",
)
self.assertEqual(projection.access_context, {})
def test_access_profile_diagnostics_are_redacted(self):
service, _, _ = _service()
session = _bootstrap(service)
profile = service.register_access_profile(
session.actor,
AccessProfile(
tenant="tenant:coulomb",
display_name="Sensitive Defaults",
hat="operator",
membership_requirements=(
AccessMembershipRequirement(
scope_type="realm",
scope_id="realm:citadel",
kind="operator",
),
),
profile_defaults={"internal_hint": "secret-default"},
claims={"private_claim": "secret-claim"},
),
correlation_id="corr-sensitive-profile",
)
diagnostics = service.access_profile_diagnostics(
session.actor,
tenant="tenant:coulomb",
correlation_id="corr-access-diagnostics",
)
self.assertEqual(diagnostics.profile_count, 1)
self.assertIn(profile.access_profile_id, diagnostics.required_factor_types)
self.assertNotIn("secret-default", repr(diagnostics))
self.assertNotIn("secret-claim", repr(diagnostics))
self.assertNotIn(
"secret-default",
repr([event.payload for event in service.outbox_events()]),
)
self.assertNotIn(
"secret-claim",
repr([event.payload for event in service.outbox_events()]),
)
def _service():
store = InMemoryUserEngineStore()
authz = LocalAuthorizationCheckPort()
service = UserEngineService(
store=store,
identity_adapter=FixtureIdentityClaimsAdapter(),
authorization=authz,
)
return service, store, authz
def _bootstrap(service: UserEngineService):
session = service.me(_claims(), correlation_id="corr-me")
service.register_application(
session.actor,
sample_application(),
binding=sample_application_binding(),
correlation_id="corr-app",
)
service.publish_catalog(
session.actor,
sample_catalog(),
correlation_id="corr-catalog",
)
return session
def _claims():
claims = human_actor_claims(subject="ada", tenant="tenant:coulomb")
claims["roles"] = ["tenant-admin"]
claims["email"] = "ada@example.test"
return claims
def _email_factor(user_id: str) -> IdentityFactor:
return IdentityFactor(
factor_type=IdentityFactorType.EMAIL,
normalized_value="ada@example.test",
user_id=user_id,
display_value="ada@example.test",
source_system="fixture-email",
)
def _realm_operator_profile() -> AccessProfile:
return AccessProfile(
tenant="tenant:coulomb",
display_name="Realm Operator",
hat="operator",
scope_type=AccessScopeType.REALM,
scope_id="realm:citadel",
realm_id="realm:citadel",
service_id="app.demo",
asset_id="asset:ledger",
membership_requirements=(
AccessMembershipRequirement(
scope_type="realm",
scope_id="realm:citadel",
kind="operator",
),
),
required_factor_types=(IdentityFactorType.EMAIL,),
profile_defaults={"workspace_mode": "ops"},
claims={"service_role": "operator"},
)
if __name__ == "__main__":
unittest.main()