Files
user-engine/tests/test_finalization_contracts.py

89 lines
2.7 KiB
Python

import unittest
from user_engine.adapters.local import (
InMemoryUserEngineStore,
LocalAuthorizationCheckPort,
)
from user_engine.projections import ClaimsEnrichmentProjectionCache
from user_engine.service import UserEngineService
from user_engine.testing.fixtures import (
FixtureIdentityClaimsAdapter,
human_actor_claims,
sample_application,
sample_application_binding,
sample_catalog,
)
class FinalizationContractTests(unittest.TestCase):
def test_operability_snapshot_and_outbox_diagnostics_are_consistent(self):
service = _service()
session = _bootstrap(service)
snapshot = service.operability_snapshot()
outbox = service.outbox_diagnostics()
log_context = service.structured_log_context(
correlation_id="corr-log",
tenant="tenant:coulomb",
actor=session.actor,
)
self.assertTrue(snapshot.ready)
self.assertEqual(snapshot.issues, ())
self.assertEqual(
snapshot.metrics["pending_outbox_events"],
outbox.pending_count,
)
self.assertGreaterEqual(outbox.event_types["user.created"], 1)
self.assertEqual(log_context["correlation_id"], "corr-log")
self.assertEqual(log_context["actor_subject"], session.actor.subject)
def test_claims_enrichment_cache_reports_status(self):
service = _service()
session = _bootstrap(service)
cache = ClaimsEnrichmentProjectionCache()
cache.get(
service,
session.actor,
user_id=session.user.user_id,
tenant="tenant:coulomb",
application_id="app.demo",
correlation_id="corr-cache",
)
status = cache.status()
cache.invalidate_user(session.user.user_id)
self.assertEqual(status.entries, 1)
self.assertEqual(status.tenants, ("tenant:coulomb",))
self.assertEqual(status.applications, ("app.demo",))
self.assertEqual(cache.status().entries, 0)
def _service() -> UserEngineService:
return UserEngineService(
store=InMemoryUserEngineStore(),
identity_adapter=FixtureIdentityClaimsAdapter(),
authorization=LocalAuthorizationCheckPort(),
)
def _bootstrap(service: UserEngineService):
session = service.me(human_actor_claims(), correlation_id="corr-me")
service.register_application(
session.actor,
sample_application(),
binding=sample_application_binding(),
correlation_id="corr-app",
)
service.publish_catalog(
session.actor,
sample_catalog(),
correlation_id="corr-catalog",
)
return session
if __name__ == "__main__":
unittest.main()