Files
user-engine/tests/test_integrated_scenarios.py

311 lines
10 KiB
Python

import unittest
from user_engine.adapters.local import InMemoryUserEngineStore
from user_engine.domain import (
AttributeDefinition,
AuthorizationEffect,
AuthorizationRequest,
Catalog,
CatalogLifecycle,
Mutability,
ProfileScope,
ProjectionType,
Sensitivity,
Visibility,
)
from user_engine.errors import AuthorizationDenied, ConflictError, ValidationError
from user_engine.projections import ClaimsEnrichmentProjectionCache
from user_engine.service import REDACTED, UserEngineService
from user_engine.testing.fixtures import sample_application, sample_application_binding
from user_engine.testing.scenarios import (
SCENARIO_MATRIX,
ScenarioAuthorizationHarness,
StrictFixtureIdentityClaimsAdapter,
expired_claims,
human_claims,
invalid_claims,
local_issuer_claims,
missing_tenant_claims,
platform_operator_claims,
tenant_admin_claims,
)
class IntegratedScenarioTests(unittest.TestCase):
def test_scenario_matrix_names_expected_conformance_paths(self):
self.assertEqual(
set(SCENARIO_MATRIX),
{
"standalone_self_service",
"denied_access",
"tenant_admin",
"platform_operator",
"cross_tenant_denial",
"two_applications",
"sensitive_redaction",
"audit_event_replay",
"family_dataspace_onboarding",
},
)
def test_strict_identity_adapter_rejects_negative_identity_fixtures(self):
adapter = StrictFixtureIdentityClaimsAdapter()
for claims in (
local_issuer_claims(),
invalid_claims(),
expired_claims(),
missing_tenant_claims(),
):
with self.assertRaises(ValidationError):
adapter.normalize(claims)
def test_authorization_harness_supports_denial_obligation_assurance_and_batch(self):
service, _, authz = _service(
action_effects={"profile.write": AuthorizationEffect.ALLOW},
action_obligations={"profile.write": ("audit:retain",)},
)
session = _bootstrap(service, tenant_admin_claims())
request = AuthorizationRequest(
actor=session.actor,
resource_type="user-engine:profile",
resource_id=session.user.user_id,
action="profile.write",
tenant="tenant:coulomb",
correlation_id="corr-authz",
target_user_id=session.user.user_id,
context={"required_assurance": "aal2"},
)
decision = authz.check(request)
denied = authz.check(
AuthorizationRequest(
actor=session.actor,
resource_type="user-engine:profile",
resource_id=session.user.user_id,
action="profile.write",
tenant="tenant:faraday",
correlation_id="corr-cross-tenant",
target_user_id=session.user.user_id,
)
)
batch = authz.batch_check((request, request))
self.assertTrue(decision.allowed)
self.assertEqual(decision.obligations, ("audit:retain",))
self.assertFalse(denied.allowed)
self.assertEqual(len(batch), 2)
def test_full_flow_from_claims_to_authz_mutation_projection_audit_and_event(self):
service, _, _ = _service()
session = _bootstrap(service, tenant_admin_claims(), catalog=_sensitive_catalog())
service.add_membership(
session.actor,
session.user.user_id,
tenant="tenant:coulomb",
scope_type="team",
scope_id="team:demo",
kind="member",
correlation_id="corr-membership",
)
service.set_profile_value(
session.actor,
session.user.user_id,
"demo.recovery_hint",
"first shell",
tenant="tenant:coulomb",
correlation_id="corr-sensitive",
)
projection = service.projection(
session.actor,
session.user.user_id,
ProjectionType.APPLICATION_RUNTIME,
tenant="tenant:coulomb",
application_id="app.demo",
correlation_id="corr-projection",
)
self.assertEqual(projection.values["demo.recovery_hint"], REDACTED)
self.assertIn("membership.added", [event.event_type for event in service.outbox_events()])
self.assertTrue(all(record.correlation_id for record in service.audit_records()))
def test_cache_reuse_and_invalidation_control_projection_work(self):
service, _, authz = _service()
session = _bootstrap(service, human_claims())
cache = ClaimsEnrichmentProjectionCache()
before = len(authz.requests)
cache.get(
service,
session.actor,
user_id=session.user.user_id,
tenant="tenant:coulomb",
application_id="app.demo",
correlation_id="corr-cache-1",
)
cache.get(
service,
session.actor,
user_id=session.user.user_id,
tenant="tenant:coulomb",
application_id="app.demo",
correlation_id="corr-cache-2",
)
after_cached = len(authz.requests)
cache.invalidate_user(session.user.user_id)
cache.get(
service,
session.actor,
user_id=session.user.user_id,
tenant="tenant:coulomb",
application_id="app.demo",
correlation_id="corr-cache-3",
)
self.assertEqual(after_cached, before + 1)
self.assertEqual(len(authz.requests), after_cached + 1)
def test_security_negative_paths_cover_admin_overreach_and_namespace_hijack(self):
service, _, _ = _service()
session = _bootstrap(service, tenant_admin_claims())
with self.assertRaises(AuthorizationDenied):
service.set_tenant_account_status(
session.actor,
session.user.user_id,
status=session.account.status,
tenant="tenant:faraday",
correlation_id="corr-overreach",
)
with self.assertRaises(ConflictError):
service.publish_catalog(
session.actor,
Catalog(
catalog_id="hijack-demo",
namespace="demo",
version="0.1.0",
owning_application_id="app.demo",
lifecycle=CatalogLifecycle.ACTIVE,
attributes=(
AttributeDefinition(
key="demo.display_density",
value_type="string",
scope=ProfileScope.GLOBAL,
sensitivity=Sensitivity.INTERNAL,
visibility=(Visibility.APPLICATION,),
mutability=(Mutability.ADMIN,),
),
),
),
correlation_id="corr-hijack",
)
def test_platform_operator_scenario_can_manage_target_tenant(self):
service, _, _ = _service()
session = service.me(platform_operator_claims(), correlation_id="corr-platform")
managed = service.create_user(
session.actor,
display_name="Managed",
primary_email="managed@example.test",
correlation_id="corr-create",
)
account = service.set_tenant_account_status(
session.actor,
managed.user_id,
session.account.status,
tenant="tenant:coulomb",
correlation_id="corr-tenant-account",
)
self.assertEqual(account.tenant, "tenant:coulomb")
def _service(
*,
action_effects: dict[str, AuthorizationEffect] | None = None,
action_obligations: dict[str, tuple[str, ...]] | None = None,
):
store = InMemoryUserEngineStore()
authz = ScenarioAuthorizationHarness(
action_effects=action_effects,
action_obligations=action_obligations,
)
service = UserEngineService(
store=store,
identity_adapter=StrictFixtureIdentityClaimsAdapter(),
authorization=authz,
)
return service, store, authz
def _bootstrap(
service: UserEngineService,
claims: dict[str, object],
*,
catalog: Catalog | None = None,
):
session = service.me(claims, correlation_id="corr-me")
service.register_application(
session.actor,
sample_application(),
binding=sample_application_binding(),
correlation_id="corr-app",
)
service.publish_catalog(
session.actor,
catalog or _simple_catalog(),
correlation_id="corr-catalog",
)
return session
def _simple_catalog() -> Catalog:
return Catalog(
catalog_id="demo-profile",
namespace="demo",
version="0.1.0",
owning_application_id="app.demo",
lifecycle=CatalogLifecycle.ACTIVE,
attributes=(
AttributeDefinition(
key="demo.display_density",
value_type="string",
scope=ProfileScope.APPLICATION,
sensitivity=Sensitivity.INTERNAL,
visibility=(Visibility.USER, Visibility.APPLICATION),
mutability=(Mutability.USER,),
default="comfortable",
validation={"enum": ["compact", "comfortable"]},
),
),
)
def _sensitive_catalog() -> Catalog:
catalog = _simple_catalog()
return Catalog(
catalog_id=catalog.catalog_id,
namespace=catalog.namespace,
version=catalog.version,
owning_application_id=catalog.owning_application_id,
lifecycle=catalog.lifecycle,
attributes=(
*catalog.attributes,
AttributeDefinition(
key="demo.recovery_hint",
value_type="string",
scope=ProfileScope.GLOBAL,
sensitivity=Sensitivity.SENSITIVE,
visibility=(Visibility.USER, Visibility.APPLICATION, Visibility.ADMIN),
mutability=(Mutability.USER, Mutability.ADMIN),
),
),
)
if __name__ == "__main__":
unittest.main()