generated from coulomb/repo-seed
188 lines
6.3 KiB
Python
188 lines
6.3 KiB
Python
import unittest
|
|
from typing import Any
|
|
|
|
from user_engine.adapters.local import (
|
|
InMemoryUserEngineStore,
|
|
LocalAuthorizationCheckPort,
|
|
)
|
|
from user_engine.domain import (
|
|
AuthorizationEffect,
|
|
AuthorizationRequest,
|
|
OutboxEvent,
|
|
ProfileScope,
|
|
ProjectionType,
|
|
)
|
|
from user_engine.errors import AuthorizationDenied
|
|
from user_engine.service import UserEngineService
|
|
from user_engine.testing.fixtures import (
|
|
FixtureIdentityClaimsAdapter,
|
|
StaticAuthorizationCheckPort,
|
|
human_actor_claims,
|
|
sample_application,
|
|
sample_application_binding,
|
|
sample_catalog,
|
|
)
|
|
|
|
|
|
class PortFixtureTests(unittest.TestCase):
|
|
def test_fixture_identity_claims_adapter_normalizes_actor(self):
|
|
adapter = FixtureIdentityClaimsAdapter()
|
|
actor = adapter.normalize(human_actor_claims())
|
|
|
|
self.assertEqual(actor.identity_key, ("https://issuer.example.test", "user-123"))
|
|
self.assertEqual(actor.tenant, "tenant:coulomb")
|
|
self.assertIn("profile", actor.scopes)
|
|
|
|
def test_static_authorization_check_records_requests(self):
|
|
adapter = FixtureIdentityClaimsAdapter()
|
|
actor = adapter.normalize(human_actor_claims())
|
|
authz = StaticAuthorizationCheckPort(effect=AuthorizationEffect.ALLOW)
|
|
request = AuthorizationRequest(
|
|
actor=actor,
|
|
resource_type="user-engine:profile",
|
|
resource_id="usr_123",
|
|
action="read",
|
|
tenant="tenant:coulomb",
|
|
correlation_id="corr-1",
|
|
target_user_id="usr_123",
|
|
)
|
|
|
|
decision = authz.check(request)
|
|
|
|
self.assertTrue(decision.allowed)
|
|
self.assertEqual(authz.requests, [request])
|
|
|
|
def test_sample_application_binding_keeps_external_ids_separate(self):
|
|
binding = sample_application_binding()
|
|
|
|
self.assertEqual(binding.application_id, "app.demo")
|
|
self.assertEqual(binding.oidc_client_id, "demo-client")
|
|
self.assertEqual(binding.protected_system_id, "user-engine.demo")
|
|
|
|
def test_user_engine_service_consumes_store_protocol(self):
|
|
store = _ProtocolOnlyStore(InMemoryUserEngineStore())
|
|
service = UserEngineService(
|
|
store=store,
|
|
identity_adapter=FixtureIdentityClaimsAdapter(),
|
|
authorization=StaticAuthorizationCheckPort(),
|
|
)
|
|
|
|
session = service.me(human_actor_claims(), correlation_id="corr-me")
|
|
service.register_application(
|
|
session.actor,
|
|
sample_application(),
|
|
binding=sample_application_binding(),
|
|
correlation_id="corr-app",
|
|
)
|
|
service.publish_catalog(
|
|
session.actor,
|
|
sample_catalog(),
|
|
correlation_id="corr-catalog",
|
|
)
|
|
service.set_profile_value(
|
|
session.actor,
|
|
session.user.user_id,
|
|
"demo.display_density",
|
|
"compact",
|
|
scope=ProfileScope.APPLICATION,
|
|
scope_id="app.demo",
|
|
application_id="app.demo",
|
|
correlation_id="corr-profile",
|
|
)
|
|
projection = service.projection(
|
|
session.actor,
|
|
session.user.user_id,
|
|
ProjectionType.APPLICATION_RUNTIME,
|
|
application_id="app.demo",
|
|
correlation_id="corr-projection",
|
|
)
|
|
|
|
self.assertEqual(projection.values["demo.display_density"], "compact")
|
|
self.assertTrue(service.operability_snapshot().ready)
|
|
|
|
def test_store_transaction_rolls_back_failed_mutation(self):
|
|
store = _FailingOutboxStore()
|
|
service = UserEngineService(
|
|
store=store,
|
|
identity_adapter=FixtureIdentityClaimsAdapter(),
|
|
authorization=StaticAuthorizationCheckPort(),
|
|
)
|
|
|
|
with self.assertRaises(RuntimeError):
|
|
service.me(human_actor_claims(), correlation_id="corr-fail")
|
|
|
|
self.assertEqual(store.record_counts()["users"], 0)
|
|
self.assertEqual(store.audit_log(), ())
|
|
self.assertEqual(store.pending_outbox(), ())
|
|
self.assertIsNone(
|
|
store.find_identity("https://issuer.example.test", "user-123")
|
|
)
|
|
|
|
def test_denial_audit_survives_outer_transaction_rollback(self):
|
|
store = InMemoryUserEngineStore()
|
|
service = UserEngineService(
|
|
store=store,
|
|
identity_adapter=FixtureIdentityClaimsAdapter(),
|
|
authorization=LocalAuthorizationCheckPort(
|
|
action_effects={"membership.write": AuthorizationEffect.DENY}
|
|
),
|
|
)
|
|
session = service.me(human_actor_claims(), correlation_id="corr-me")
|
|
before_audit = len(store.audit_log())
|
|
before_outbox = len(store.pending_outbox())
|
|
|
|
with self.assertRaises(AuthorizationDenied):
|
|
with store.transaction():
|
|
service.add_membership(
|
|
session.actor,
|
|
session.user.user_id,
|
|
tenant="tenant:coulomb",
|
|
scope_type="team",
|
|
scope_id="team:demo",
|
|
kind="member",
|
|
correlation_id="corr-denied-membership",
|
|
)
|
|
|
|
self.assertEqual(len(store.audit_log()), before_audit + 1)
|
|
self.assertEqual(store.audit_log()[-1].summary, "authorization denied")
|
|
self.assertEqual(len(store.pending_outbox()), before_outbox)
|
|
self.assertEqual(store.record_counts()["memberships"], 0)
|
|
|
|
|
|
class _ProtocolOnlyStore:
|
|
"""Proxy that fails if service code reaches for local collection fields."""
|
|
|
|
_blocked_fields = {
|
|
"accounts",
|
|
"applications",
|
|
"audit_records",
|
|
"bindings",
|
|
"catalogs",
|
|
"family_invitations",
|
|
"identities",
|
|
"identity_factors",
|
|
"memberships",
|
|
"outbox_events",
|
|
"profile_values",
|
|
"registration_sessions",
|
|
"tenant_accounts",
|
|
"users",
|
|
}
|
|
|
|
def __init__(self, inner: InMemoryUserEngineStore) -> None:
|
|
self._inner = inner
|
|
|
|
def __getattr__(self, name: str) -> Any:
|
|
if name in self._blocked_fields:
|
|
raise AssertionError(f"service accessed concrete store field {name}")
|
|
return getattr(self._inner, name)
|
|
|
|
|
|
class _FailingOutboxStore(InMemoryUserEngineStore):
|
|
def append_outbox(self, event: OutboxEvent) -> None:
|
|
raise RuntimeError("outbox unavailable")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|