generated from coulomb/repo-seed
355 lines
12 KiB
Python
355 lines
12 KiB
Python
import unittest
|
|
|
|
from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort
|
|
from user_engine.domain import (
|
|
AccessMembershipRequirement,
|
|
AccessProfile,
|
|
AccessScopeType,
|
|
IdentityFactor,
|
|
IdentityFactorType,
|
|
ProjectionType,
|
|
)
|
|
from user_engine.errors import AuthorizationDenied, ValidationError
|
|
from user_engine.service import UserEngineService
|
|
from user_engine.testing.fixtures import (
|
|
FixtureIdentityClaimsAdapter,
|
|
human_actor_claims,
|
|
sample_application,
|
|
sample_application_binding,
|
|
sample_catalog,
|
|
)
|
|
|
|
|
|
class AccessProfileTests(unittest.TestCase):
|
|
def test_select_active_hat_updates_identity_context_and_projection(self):
|
|
service, store, _ = _service()
|
|
session = _bootstrap(service)
|
|
service.add_membership(
|
|
session.actor,
|
|
session.user.user_id,
|
|
tenant="tenant:coulomb",
|
|
scope_type="realm",
|
|
scope_id="realm:citadel",
|
|
kind="operator",
|
|
correlation_id="corr-realm-membership",
|
|
)
|
|
store.save_identity_factor(_email_factor(session.user.user_id))
|
|
profile = service.register_access_profile(
|
|
session.actor,
|
|
_realm_operator_profile(),
|
|
correlation_id="corr-profile-register",
|
|
)
|
|
|
|
selection = service.select_active_hat(
|
|
session.actor,
|
|
session.user.user_id,
|
|
profile.access_profile_id,
|
|
correlation_id="corr-select-hat",
|
|
)
|
|
context = service.identity_context(
|
|
session.actor,
|
|
application_id="app.demo",
|
|
include_profile=True,
|
|
correlation_id="corr-identity-context",
|
|
)
|
|
projection = service.projection(
|
|
session.actor,
|
|
session.user.user_id,
|
|
ProjectionType.CLAIMS_ENRICHMENT,
|
|
application_id="app.demo",
|
|
tenant="tenant:coulomb",
|
|
correlation_id="corr-projection",
|
|
)
|
|
|
|
self.assertEqual(selection.active_context.hat, "operator")
|
|
self.assertEqual(selection.active_context.realm_id, "realm:citadel")
|
|
self.assertEqual(selection.active_context.service_id, "app.demo")
|
|
self.assertEqual(selection.active_context.asset_id, "asset:ledger")
|
|
self.assertEqual(selection.active_context.factor_ids[0].startswith("fac_"), True)
|
|
self.assertEqual(context.active_access_context.hat, "operator")
|
|
self.assertEqual(context.entity_refs["active_hat"].concept, "Hat")
|
|
self.assertIn(
|
|
"wears_hat",
|
|
{relationship.relationship_type for relationship in context.relationship_refs},
|
|
)
|
|
self.assertTrue(
|
|
any(fact.scope_id == "realm:citadel" for fact in context.access_control_facts)
|
|
)
|
|
self.assertEqual(projection.access_context["active_hat"], "operator")
|
|
self.assertEqual(projection.access_context["claims"]["service_role"], "operator")
|
|
self.assertNotIn(
|
|
"ada@example.test",
|
|
repr([event.payload for event in service.outbox_events()]),
|
|
)
|
|
|
|
def test_cross_tenant_access_profile_denied(self):
|
|
service, _, _ = _service()
|
|
session = _bootstrap(service)
|
|
|
|
with self.assertRaises(AuthorizationDenied):
|
|
service.register_access_profile(
|
|
session.actor,
|
|
AccessProfile(
|
|
tenant="tenant:faraday",
|
|
display_name="Faraday Operator",
|
|
hat="operator",
|
|
membership_requirements=(
|
|
AccessMembershipRequirement(
|
|
scope_type="realm",
|
|
scope_id="realm:faraday",
|
|
kind="operator",
|
|
),
|
|
),
|
|
),
|
|
correlation_id="corr-cross-tenant",
|
|
)
|
|
|
|
def test_missing_factor_assurance_fails_closed(self):
|
|
service, store, _ = _service()
|
|
session = _bootstrap(service)
|
|
service.add_membership(
|
|
session.actor,
|
|
session.user.user_id,
|
|
tenant="tenant:coulomb",
|
|
scope_type="realm",
|
|
scope_id="realm:citadel",
|
|
kind="operator",
|
|
correlation_id="corr-realm-membership",
|
|
)
|
|
profile = service.register_access_profile(
|
|
session.actor,
|
|
AccessProfile(
|
|
tenant="tenant:coulomb",
|
|
display_name="High Assurance Operator",
|
|
hat="operator",
|
|
scope_type=AccessScopeType.REALM,
|
|
scope_id="realm:citadel",
|
|
realm_id="realm:citadel",
|
|
membership_requirements=(
|
|
AccessMembershipRequirement(
|
|
scope_type="realm",
|
|
scope_id="realm:citadel",
|
|
kind="operator",
|
|
),
|
|
),
|
|
required_factor_types=(IdentityFactorType.EID,),
|
|
),
|
|
correlation_id="corr-high-assurance",
|
|
)
|
|
|
|
with self.assertRaises(ValidationError):
|
|
service.select_active_hat(
|
|
session.actor,
|
|
session.user.user_id,
|
|
profile.access_profile_id,
|
|
correlation_id="corr-select-missing-factor",
|
|
)
|
|
|
|
self.assertIsNone(
|
|
store.active_access_context(session.user.user_id, "tenant:coulomb")
|
|
)
|
|
|
|
def test_group_derived_access_exports_group_facts(self):
|
|
service, _, _ = _service()
|
|
session = _bootstrap(service)
|
|
service.add_membership(
|
|
session.actor,
|
|
session.user.user_id,
|
|
tenant="tenant:coulomb",
|
|
scope_type="group",
|
|
scope_id="group:research",
|
|
kind="member",
|
|
correlation_id="corr-group-membership",
|
|
)
|
|
profile = service.register_access_profile(
|
|
session.actor,
|
|
AccessProfile(
|
|
tenant="tenant:coulomb",
|
|
display_name="Research Service Hat",
|
|
hat="researcher",
|
|
scope_type=AccessScopeType.SERVICE,
|
|
scope_id="app.demo",
|
|
service_id="app.demo",
|
|
membership_requirements=(
|
|
AccessMembershipRequirement(
|
|
scope_type="group",
|
|
scope_id="group:research",
|
|
kind="member",
|
|
),
|
|
),
|
|
group_scope_ids=("group:research",),
|
|
),
|
|
correlation_id="corr-research-profile",
|
|
)
|
|
service.select_active_hat(
|
|
session.actor,
|
|
session.user.user_id,
|
|
profile.access_profile_id,
|
|
correlation_id="corr-select-researcher",
|
|
)
|
|
|
|
export = service.export_access_control_facts(
|
|
session.actor,
|
|
tenant="tenant:coulomb",
|
|
user_id=session.user.user_id,
|
|
correlation_id="corr-export-facts",
|
|
)
|
|
|
|
self.assertIn("group", export.manifest["subject_types"])
|
|
self.assertTrue(
|
|
any(
|
|
fact.subject_type == "group"
|
|
and fact.subject_id == "group:research"
|
|
and fact.scope_id == "app.demo"
|
|
for fact in export.facts
|
|
)
|
|
)
|
|
|
|
def test_service_specific_projection_filters_other_services(self):
|
|
service, store, _ = _service()
|
|
session = _bootstrap(service)
|
|
service.add_membership(
|
|
session.actor,
|
|
session.user.user_id,
|
|
tenant="tenant:coulomb",
|
|
scope_type="realm",
|
|
scope_id="realm:citadel",
|
|
kind="operator",
|
|
correlation_id="corr-realm-membership",
|
|
)
|
|
store.save_identity_factor(_email_factor(session.user.user_id))
|
|
profile = service.register_access_profile(
|
|
session.actor,
|
|
_realm_operator_profile(),
|
|
correlation_id="corr-profile-register",
|
|
)
|
|
service.select_active_hat(
|
|
session.actor,
|
|
session.user.user_id,
|
|
profile.access_profile_id,
|
|
correlation_id="corr-select-hat",
|
|
)
|
|
|
|
projection = service.projection(
|
|
session.actor,
|
|
session.user.user_id,
|
|
ProjectionType.CLAIMS_ENRICHMENT,
|
|
application_id="app.other",
|
|
tenant="tenant:coulomb",
|
|
correlation_id="corr-other-service",
|
|
)
|
|
|
|
self.assertEqual(projection.access_context, {})
|
|
|
|
def test_access_profile_diagnostics_are_redacted(self):
|
|
service, _, _ = _service()
|
|
session = _bootstrap(service)
|
|
profile = service.register_access_profile(
|
|
session.actor,
|
|
AccessProfile(
|
|
tenant="tenant:coulomb",
|
|
display_name="Sensitive Defaults",
|
|
hat="operator",
|
|
membership_requirements=(
|
|
AccessMembershipRequirement(
|
|
scope_type="realm",
|
|
scope_id="realm:citadel",
|
|
kind="operator",
|
|
),
|
|
),
|
|
profile_defaults={"internal_hint": "secret-default"},
|
|
claims={"private_claim": "secret-claim"},
|
|
),
|
|
correlation_id="corr-sensitive-profile",
|
|
)
|
|
|
|
diagnostics = service.access_profile_diagnostics(
|
|
session.actor,
|
|
tenant="tenant:coulomb",
|
|
correlation_id="corr-access-diagnostics",
|
|
)
|
|
|
|
self.assertEqual(diagnostics.profile_count, 1)
|
|
self.assertIn(profile.access_profile_id, diagnostics.required_factor_types)
|
|
self.assertNotIn("secret-default", repr(diagnostics))
|
|
self.assertNotIn("secret-claim", repr(diagnostics))
|
|
self.assertNotIn(
|
|
"secret-default",
|
|
repr([event.payload for event in service.outbox_events()]),
|
|
)
|
|
self.assertNotIn(
|
|
"secret-claim",
|
|
repr([event.payload for event in service.outbox_events()]),
|
|
)
|
|
|
|
|
|
def _service():
|
|
store = InMemoryUserEngineStore()
|
|
authz = LocalAuthorizationCheckPort()
|
|
service = UserEngineService(
|
|
store=store,
|
|
identity_adapter=FixtureIdentityClaimsAdapter(),
|
|
authorization=authz,
|
|
)
|
|
return service, store, authz
|
|
|
|
|
|
def _bootstrap(service: UserEngineService):
|
|
session = service.me(_claims(), correlation_id="corr-me")
|
|
service.register_application(
|
|
session.actor,
|
|
sample_application(),
|
|
binding=sample_application_binding(),
|
|
correlation_id="corr-app",
|
|
)
|
|
service.publish_catalog(
|
|
session.actor,
|
|
sample_catalog(),
|
|
correlation_id="corr-catalog",
|
|
)
|
|
return session
|
|
|
|
|
|
def _claims():
|
|
claims = human_actor_claims(subject="ada", tenant="tenant:coulomb")
|
|
claims["roles"] = ["tenant-admin"]
|
|
claims["email"] = "ada@example.test"
|
|
return claims
|
|
|
|
|
|
def _email_factor(user_id: str) -> IdentityFactor:
|
|
return IdentityFactor(
|
|
factor_type=IdentityFactorType.EMAIL,
|
|
normalized_value="ada@example.test",
|
|
user_id=user_id,
|
|
display_value="ada@example.test",
|
|
source_system="fixture-email",
|
|
)
|
|
|
|
|
|
def _realm_operator_profile() -> AccessProfile:
|
|
return AccessProfile(
|
|
tenant="tenant:coulomb",
|
|
display_name="Realm Operator",
|
|
hat="operator",
|
|
scope_type=AccessScopeType.REALM,
|
|
scope_id="realm:citadel",
|
|
realm_id="realm:citadel",
|
|
service_id="app.demo",
|
|
asset_id="asset:ledger",
|
|
membership_requirements=(
|
|
AccessMembershipRequirement(
|
|
scope_type="realm",
|
|
scope_id="realm:citadel",
|
|
kind="operator",
|
|
),
|
|
),
|
|
required_factor_types=(IdentityFactorType.EMAIL,),
|
|
profile_defaults={"workspace_mode": "ops"},
|
|
claims={"service_role": "operator"},
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|