generated from coulomb/repo-seed
311 lines
10 KiB
Python
311 lines
10 KiB
Python
import unittest
|
|
|
|
from user_engine.adapters.local import InMemoryUserEngineStore
|
|
from user_engine.domain import (
|
|
AttributeDefinition,
|
|
AuthorizationEffect,
|
|
AuthorizationRequest,
|
|
Catalog,
|
|
CatalogLifecycle,
|
|
Mutability,
|
|
ProfileScope,
|
|
ProjectionType,
|
|
Sensitivity,
|
|
Visibility,
|
|
)
|
|
from user_engine.errors import AuthorizationDenied, ConflictError, ValidationError
|
|
from user_engine.projections import ClaimsEnrichmentProjectionCache
|
|
from user_engine.service import REDACTED, UserEngineService
|
|
from user_engine.testing.fixtures import sample_application, sample_application_binding
|
|
from user_engine.testing.scenarios import (
|
|
SCENARIO_MATRIX,
|
|
ScenarioAuthorizationHarness,
|
|
StrictFixtureIdentityClaimsAdapter,
|
|
expired_claims,
|
|
human_claims,
|
|
invalid_claims,
|
|
local_issuer_claims,
|
|
missing_tenant_claims,
|
|
platform_operator_claims,
|
|
tenant_admin_claims,
|
|
)
|
|
|
|
|
|
class IntegratedScenarioTests(unittest.TestCase):
|
|
def test_scenario_matrix_names_expected_conformance_paths(self):
|
|
self.assertEqual(
|
|
set(SCENARIO_MATRIX),
|
|
{
|
|
"standalone_self_service",
|
|
"denied_access",
|
|
"tenant_admin",
|
|
"platform_operator",
|
|
"cross_tenant_denial",
|
|
"two_applications",
|
|
"sensitive_redaction",
|
|
"audit_event_replay",
|
|
"family_dataspace_onboarding",
|
|
},
|
|
)
|
|
|
|
def test_strict_identity_adapter_rejects_negative_identity_fixtures(self):
|
|
adapter = StrictFixtureIdentityClaimsAdapter()
|
|
|
|
for claims in (
|
|
local_issuer_claims(),
|
|
invalid_claims(),
|
|
expired_claims(),
|
|
missing_tenant_claims(),
|
|
):
|
|
with self.assertRaises(ValidationError):
|
|
adapter.normalize(claims)
|
|
|
|
def test_authorization_harness_supports_denial_obligation_assurance_and_batch(self):
|
|
service, _, authz = _service(
|
|
action_effects={"profile.write": AuthorizationEffect.ALLOW},
|
|
action_obligations={"profile.write": ("audit:retain",)},
|
|
)
|
|
session = _bootstrap(service, tenant_admin_claims())
|
|
request = AuthorizationRequest(
|
|
actor=session.actor,
|
|
resource_type="user-engine:profile",
|
|
resource_id=session.user.user_id,
|
|
action="profile.write",
|
|
tenant="tenant:coulomb",
|
|
correlation_id="corr-authz",
|
|
target_user_id=session.user.user_id,
|
|
context={"required_assurance": "aal2"},
|
|
)
|
|
|
|
decision = authz.check(request)
|
|
denied = authz.check(
|
|
AuthorizationRequest(
|
|
actor=session.actor,
|
|
resource_type="user-engine:profile",
|
|
resource_id=session.user.user_id,
|
|
action="profile.write",
|
|
tenant="tenant:faraday",
|
|
correlation_id="corr-cross-tenant",
|
|
target_user_id=session.user.user_id,
|
|
)
|
|
)
|
|
batch = authz.batch_check((request, request))
|
|
|
|
self.assertTrue(decision.allowed)
|
|
self.assertEqual(decision.obligations, ("audit:retain",))
|
|
self.assertFalse(denied.allowed)
|
|
self.assertEqual(len(batch), 2)
|
|
|
|
def test_full_flow_from_claims_to_authz_mutation_projection_audit_and_event(self):
|
|
service, _, _ = _service()
|
|
session = _bootstrap(service, tenant_admin_claims(), catalog=_sensitive_catalog())
|
|
service.add_membership(
|
|
session.actor,
|
|
session.user.user_id,
|
|
tenant="tenant:coulomb",
|
|
scope_type="team",
|
|
scope_id="team:demo",
|
|
kind="member",
|
|
correlation_id="corr-membership",
|
|
)
|
|
|
|
service.set_profile_value(
|
|
session.actor,
|
|
session.user.user_id,
|
|
"demo.recovery_hint",
|
|
"first shell",
|
|
tenant="tenant:coulomb",
|
|
correlation_id="corr-sensitive",
|
|
)
|
|
projection = service.projection(
|
|
session.actor,
|
|
session.user.user_id,
|
|
ProjectionType.APPLICATION_RUNTIME,
|
|
tenant="tenant:coulomb",
|
|
application_id="app.demo",
|
|
correlation_id="corr-projection",
|
|
)
|
|
|
|
self.assertEqual(projection.values["demo.recovery_hint"], REDACTED)
|
|
self.assertIn("membership.added", [event.event_type for event in service.outbox_events()])
|
|
self.assertTrue(all(record.correlation_id for record in service.audit_records()))
|
|
|
|
def test_cache_reuse_and_invalidation_control_projection_work(self):
|
|
service, _, authz = _service()
|
|
session = _bootstrap(service, human_claims())
|
|
cache = ClaimsEnrichmentProjectionCache()
|
|
before = len(authz.requests)
|
|
|
|
cache.get(
|
|
service,
|
|
session.actor,
|
|
user_id=session.user.user_id,
|
|
tenant="tenant:coulomb",
|
|
application_id="app.demo",
|
|
correlation_id="corr-cache-1",
|
|
)
|
|
cache.get(
|
|
service,
|
|
session.actor,
|
|
user_id=session.user.user_id,
|
|
tenant="tenant:coulomb",
|
|
application_id="app.demo",
|
|
correlation_id="corr-cache-2",
|
|
)
|
|
after_cached = len(authz.requests)
|
|
cache.invalidate_user(session.user.user_id)
|
|
cache.get(
|
|
service,
|
|
session.actor,
|
|
user_id=session.user.user_id,
|
|
tenant="tenant:coulomb",
|
|
application_id="app.demo",
|
|
correlation_id="corr-cache-3",
|
|
)
|
|
|
|
self.assertEqual(after_cached, before + 1)
|
|
self.assertEqual(len(authz.requests), after_cached + 1)
|
|
|
|
def test_security_negative_paths_cover_admin_overreach_and_namespace_hijack(self):
|
|
service, _, _ = _service()
|
|
session = _bootstrap(service, tenant_admin_claims())
|
|
|
|
with self.assertRaises(AuthorizationDenied):
|
|
service.set_tenant_account_status(
|
|
session.actor,
|
|
session.user.user_id,
|
|
status=session.account.status,
|
|
tenant="tenant:faraday",
|
|
correlation_id="corr-overreach",
|
|
)
|
|
|
|
with self.assertRaises(ConflictError):
|
|
service.publish_catalog(
|
|
session.actor,
|
|
Catalog(
|
|
catalog_id="hijack-demo",
|
|
namespace="demo",
|
|
version="0.1.0",
|
|
owning_application_id="app.demo",
|
|
lifecycle=CatalogLifecycle.ACTIVE,
|
|
attributes=(
|
|
AttributeDefinition(
|
|
key="demo.display_density",
|
|
value_type="string",
|
|
scope=ProfileScope.GLOBAL,
|
|
sensitivity=Sensitivity.INTERNAL,
|
|
visibility=(Visibility.APPLICATION,),
|
|
mutability=(Mutability.ADMIN,),
|
|
),
|
|
),
|
|
),
|
|
correlation_id="corr-hijack",
|
|
)
|
|
|
|
def test_platform_operator_scenario_can_manage_target_tenant(self):
|
|
service, _, _ = _service()
|
|
session = service.me(platform_operator_claims(), correlation_id="corr-platform")
|
|
managed = service.create_user(
|
|
session.actor,
|
|
display_name="Managed",
|
|
primary_email="managed@example.test",
|
|
correlation_id="corr-create",
|
|
)
|
|
|
|
account = service.set_tenant_account_status(
|
|
session.actor,
|
|
managed.user_id,
|
|
session.account.status,
|
|
tenant="tenant:coulomb",
|
|
correlation_id="corr-tenant-account",
|
|
)
|
|
|
|
self.assertEqual(account.tenant, "tenant:coulomb")
|
|
|
|
|
|
def _service(
|
|
*,
|
|
action_effects: dict[str, AuthorizationEffect] | None = None,
|
|
action_obligations: dict[str, tuple[str, ...]] | None = None,
|
|
):
|
|
store = InMemoryUserEngineStore()
|
|
authz = ScenarioAuthorizationHarness(
|
|
action_effects=action_effects,
|
|
action_obligations=action_obligations,
|
|
)
|
|
service = UserEngineService(
|
|
store=store,
|
|
identity_adapter=StrictFixtureIdentityClaimsAdapter(),
|
|
authorization=authz,
|
|
)
|
|
return service, store, authz
|
|
|
|
|
|
def _bootstrap(
|
|
service: UserEngineService,
|
|
claims: dict[str, object],
|
|
*,
|
|
catalog: Catalog | None = None,
|
|
):
|
|
session = service.me(claims, correlation_id="corr-me")
|
|
service.register_application(
|
|
session.actor,
|
|
sample_application(),
|
|
binding=sample_application_binding(),
|
|
correlation_id="corr-app",
|
|
)
|
|
service.publish_catalog(
|
|
session.actor,
|
|
catalog or _simple_catalog(),
|
|
correlation_id="corr-catalog",
|
|
)
|
|
return session
|
|
|
|
|
|
def _simple_catalog() -> Catalog:
|
|
return Catalog(
|
|
catalog_id="demo-profile",
|
|
namespace="demo",
|
|
version="0.1.0",
|
|
owning_application_id="app.demo",
|
|
lifecycle=CatalogLifecycle.ACTIVE,
|
|
attributes=(
|
|
AttributeDefinition(
|
|
key="demo.display_density",
|
|
value_type="string",
|
|
scope=ProfileScope.APPLICATION,
|
|
sensitivity=Sensitivity.INTERNAL,
|
|
visibility=(Visibility.USER, Visibility.APPLICATION),
|
|
mutability=(Mutability.USER,),
|
|
default="comfortable",
|
|
validation={"enum": ["compact", "comfortable"]},
|
|
),
|
|
),
|
|
)
|
|
|
|
|
|
def _sensitive_catalog() -> Catalog:
|
|
catalog = _simple_catalog()
|
|
return Catalog(
|
|
catalog_id=catalog.catalog_id,
|
|
namespace=catalog.namespace,
|
|
version=catalog.version,
|
|
owning_application_id=catalog.owning_application_id,
|
|
lifecycle=catalog.lifecycle,
|
|
attributes=(
|
|
*catalog.attributes,
|
|
AttributeDefinition(
|
|
key="demo.recovery_hint",
|
|
value_type="string",
|
|
scope=ProfileScope.GLOBAL,
|
|
sensitivity=Sensitivity.SENSITIVE,
|
|
visibility=(Visibility.USER, Visibility.APPLICATION, Visibility.ADMIN),
|
|
mutability=(Mutability.USER, Mutability.ADMIN),
|
|
),
|
|
),
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|