generated from coulomb/repo-seed
Finalize user-engine contracts and operability
This commit is contained in:
88
tests/test_finalization_contracts.py
Normal file
88
tests/test_finalization_contracts.py
Normal file
@@ -0,0 +1,88 @@
|
||||
import unittest
|
||||
|
||||
from user_engine.adapters.local import (
|
||||
InMemoryUserEngineStore,
|
||||
LocalAuthorizationCheckPort,
|
||||
)
|
||||
from user_engine.projections import ClaimsEnrichmentProjectionCache
|
||||
from user_engine.service import UserEngineService
|
||||
from user_engine.testing.fixtures import (
|
||||
FixtureIdentityClaimsAdapter,
|
||||
human_actor_claims,
|
||||
sample_application,
|
||||
sample_application_binding,
|
||||
sample_catalog,
|
||||
)
|
||||
|
||||
|
||||
class FinalizationContractTests(unittest.TestCase):
|
||||
def test_operability_snapshot_and_outbox_diagnostics_are_consistent(self):
|
||||
service = _service()
|
||||
session = _bootstrap(service)
|
||||
|
||||
snapshot = service.operability_snapshot()
|
||||
outbox = service.outbox_diagnostics()
|
||||
log_context = service.structured_log_context(
|
||||
correlation_id="corr-log",
|
||||
tenant="tenant:coulomb",
|
||||
actor=session.actor,
|
||||
)
|
||||
|
||||
self.assertTrue(snapshot.ready)
|
||||
self.assertEqual(snapshot.issues, ())
|
||||
self.assertEqual(
|
||||
snapshot.metrics["pending_outbox_events"],
|
||||
outbox.pending_count,
|
||||
)
|
||||
self.assertGreaterEqual(outbox.event_types["user.created"], 1)
|
||||
self.assertEqual(log_context["correlation_id"], "corr-log")
|
||||
self.assertEqual(log_context["actor_subject"], session.actor.subject)
|
||||
|
||||
def test_claims_enrichment_cache_reports_status(self):
|
||||
service = _service()
|
||||
session = _bootstrap(service)
|
||||
cache = ClaimsEnrichmentProjectionCache()
|
||||
|
||||
cache.get(
|
||||
service,
|
||||
session.actor,
|
||||
user_id=session.user.user_id,
|
||||
tenant="tenant:coulomb",
|
||||
application_id="app.demo",
|
||||
correlation_id="corr-cache",
|
||||
)
|
||||
status = cache.status()
|
||||
cache.invalidate_user(session.user.user_id)
|
||||
|
||||
self.assertEqual(status.entries, 1)
|
||||
self.assertEqual(status.tenants, ("tenant:coulomb",))
|
||||
self.assertEqual(status.applications, ("app.demo",))
|
||||
self.assertEqual(cache.status().entries, 0)
|
||||
|
||||
|
||||
def _service() -> UserEngineService:
|
||||
return UserEngineService(
|
||||
store=InMemoryUserEngineStore(),
|
||||
identity_adapter=FixtureIdentityClaimsAdapter(),
|
||||
authorization=LocalAuthorizationCheckPort(),
|
||||
)
|
||||
|
||||
|
||||
def _bootstrap(service: UserEngineService):
|
||||
session = service.me(human_actor_claims(), correlation_id="corr-me")
|
||||
service.register_application(
|
||||
session.actor,
|
||||
sample_application(),
|
||||
binding=sample_application_binding(),
|
||||
correlation_id="corr-app",
|
||||
)
|
||||
service.publish_catalog(
|
||||
session.actor,
|
||||
sample_catalog(),
|
||||
correlation_id="corr-catalog",
|
||||
)
|
||||
return session
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user