Files
user-engine/tests/test_registration_security_conformance.py

577 lines
20 KiB
Python

from dataclasses import replace
from datetime import timedelta
import unittest
from user_engine.adapters.local import InMemoryUserEngineStore, LocalAuthorizationCheckPort
from user_engine.domain import (
AccessMembershipRequirement,
AccessProfile,
AccessScopeType,
AccountStatus,
AttributeDefinition,
Catalog,
CatalogLifecycle,
FactorVerification,
IdentityFactorType,
Mutability,
OnboardingJourneyStatus,
OnboardingTriggerType,
PreparedEntitlement,
PreparedEntitlementKind,
PreparedFactorRequirement,
ProfileScope,
ProjectionType,
Sensitivity,
Visibility,
WelcomeProtocol,
WelcomeProtocolStep,
utc_now,
)
from user_engine.errors import AuthorizationDenied, ConflictError, ValidationError
from user_engine.service import REDACTED, UserEngineService
from user_engine.testing.fixtures import (
FixtureIdentityClaimsAdapter,
human_actor_claims,
sample_application,
sample_application_binding,
)
from user_engine.testing.scenarios import (
ScenarioAuthorizationHarness,
StrictFixtureIdentityClaimsAdapter,
missing_tenant_claims,
)
from user_engine.ui import RegistrationAccessManagementUi, UiViewport
class RegistrationSecurityConformanceTests(unittest.TestCase):
def test_full_registration_claim_hat_onboarding_ui_conformance_path(self):
service, store, _ = _service()
actor = _actor("conformance-user")
_bootstrap_application(service, actor)
service.register_welcome_protocol(
actor,
_prepared_welcome_protocol(),
correlation_id="corr-conf-protocol",
)
prepared = service.prepare_account(
actor,
tenant="tenant:coulomb",
required_factor_matches=(_email_requirement(),),
entitlements=(
PreparedEntitlement(
kind=PreparedEntitlementKind.MEMBERSHIP,
tenant="tenant:coulomb",
scope_type="realm",
scope_id="realm:citadel",
role="operator",
),
PreparedEntitlement(
kind=PreparedEntitlementKind.ONBOARDING_JOURNEY,
tenant="tenant:coulomb",
onboarding_journey="welcome-demo",
),
),
correlation_id="corr-conf-prepare",
)
registration = _complete_registration(service, actor)
claim = service.claim_prepared_account(
actor,
registration.session.registration_id,
prepared_account_id=prepared.prepared_account_id,
correlation_id="corr-conf-claim",
)
profile = service.register_access_profile(
actor,
_operator_access_profile(),
correlation_id="corr-conf-profile",
)
selection = service.select_active_hat(
actor,
registration.user.user_id,
profile.access_profile_id,
correlation_id="corr-conf-hat",
)
projection = service.projection(
actor,
registration.user.user_id,
ProjectionType.CLAIMS_ENRICHMENT,
application_id="app.demo",
tenant="tenant:coulomb",
correlation_id="corr-conf-projection",
)
context = service.identity_context(
actor,
user_id=registration.user.user_id,
tenant="tenant:coulomb",
application_id="app.demo",
correlation_id="corr-conf-context",
)
export = service.export_access_control_facts(
actor,
tenant="tenant:coulomb",
user_id=registration.user.user_id,
correlation_id="corr-conf-export",
)
ui_html = RegistrationAccessManagementUi(service).render_html(
RegistrationAccessManagementUi(service).admin_dashboard(
actor,
tenant="tenant:coulomb",
viewport=UiViewport.DESKTOP,
)
)
self.assertEqual(claim.memberships[0].kind, "operator")
self.assertEqual(selection.active_context.hat, "operator")
self.assertEqual(projection.access_context["active_hat"], "operator")
self.assertTrue(context.onboarding_journeys)
self.assertEqual(
context.onboarding_journeys[0].status,
OnboardingJourneyStatus.IN_PROGRESS,
)
self.assertIn("user", export.manifest["subject_types"])
self.assertIn(
"onboarding_journey.started",
[event.event_type for event in service.outbox_events()],
)
self.assertNotIn("sample.user@example.test", ui_html)
self.assertEqual(store.record_counts()["onboarding_journeys"], 1)
def test_security_negative_paths_fail_closed_with_audit_evidence(self):
service, store, _ = _service()
actor = _actor("security-user")
_bootstrap_application(service, actor)
registration = _complete_registration(service, actor)
other_user = service.create_user(
actor,
display_name="Other",
primary_email="other@example.test",
correlation_id="corr-other",
)
with self.assertRaises(ConflictError):
service.link_identity(
actor,
other_user.user_id,
issuer=actor.issuer,
subject=actor.subject,
correlation_id="corr-duplicate-identity",
)
with self.assertRaises(ValidationError):
service.prepare_account(
actor,
tenant="tenant:coulomb",
required_factor_matches=(
PreparedFactorRequirement(
factor_type=IdentityFactorType.EMAIL,
normalized_value=" ",
),
),
entitlements=(_membership_entitlement(),),
correlation_id="corr-weak-factor",
)
hijack = service.prepare_account(
actor,
tenant="tenant:coulomb",
required_factor_matches=(
PreparedFactorRequirement(
factor_type=IdentityFactorType.EMAIL,
normalized_value="victim@example.test",
),
),
entitlements=(_membership_entitlement(),),
correlation_id="corr-hijack-prepare",
)
with self.assertRaises(ValidationError):
service.claim_prepared_account(
actor,
registration.session.registration_id,
prepared_account_id=hijack.prepared_account_id,
correlation_id="corr-hijack-claim",
)
expired = service.prepare_account(
actor,
tenant="tenant:coulomb",
required_factor_matches=(_email_requirement("expired@example.test"),),
entitlements=(_membership_entitlement(scope_id="realm:expired"),),
expires_at=utc_now() - timedelta(days=1),
correlation_id="corr-expired-prepare",
)
with self.assertRaises(ValidationError):
service.claim_prepared_account(
actor,
registration.session.registration_id,
prepared_account_id=expired.prepared_account_id,
correlation_id="corr-expired-claim",
)
with self.assertRaises(AuthorizationDenied):
service.resolve_tenant_context(actor, "tenant:faraday")
privileged = service.prepare_account(
actor,
tenant="tenant:coulomb",
required_factor_matches=(_email_requirement(),),
entitlements=(
replace(
_membership_entitlement(scope_id="realm:privileged"),
role="admin",
requires_approval=True,
),
),
correlation_id="corr-privileged-prepare",
)
with self.assertRaises(AuthorizationDenied):
service.claim_prepared_account(
actor,
registration.session.registration_id,
prepared_account_id=privileged.prepared_account_id,
correlation_id="corr-privileged-claim",
)
access_profile = service.register_access_profile(
actor,
replace(_operator_access_profile(), requires_approval=True),
correlation_id="corr-stale-approval-profile",
)
with self.assertRaises(AuthorizationDenied):
service.select_active_hat(
actor,
registration.user.user_id,
access_profile.access_profile_id,
correlation_id="corr-stale-approval",
)
audit_summaries = [record.summary for record in service.audit_records()]
self.assertIn(
"prepared account claim denied: factor mismatch or closed",
audit_summaries,
)
self.assertIn(
"prepared account claim denied: approval required",
audit_summaries,
)
self.assertEqual(store.memberships_for_user(other_user.user_id), ())
def test_redaction_and_diagnostics_conformance(self):
service, _, _ = _service()
actor = _actor("redaction-user")
_bootstrap_application(service, actor, catalog=_sensitive_catalog())
registration = _complete_registration(service, actor)
service.set_profile_value(
actor,
registration.user.user_id,
"demo.recovery_hint",
"blue envelope",
tenant="tenant:coulomb",
correlation_id="corr-sensitive-profile",
)
service.prepare_account(
actor,
tenant="tenant:coulomb",
required_factor_matches=(_email_requirement(),),
entitlements=(_membership_entitlement(),),
primary_email="sample.user@example.test",
correlation_id="corr-redaction-prepare",
)
service.register_access_profile(
actor,
replace(
_operator_access_profile(),
claims={"policy_secret": "do-not-render"},
profile_defaults={"landing_hint": "do-not-render"},
),
correlation_id="corr-redaction-profile",
)
projection = service.projection(
actor,
registration.user.user_id,
ProjectionType.APPLICATION_RUNTIME,
application_id="app.demo",
tenant="tenant:coulomb",
correlation_id="corr-sensitive-projection",
)
access_diagnostics = service.access_profile_diagnostics(
actor,
tenant="tenant:coulomb",
correlation_id="corr-access-diagnostics",
)
admin_html = RegistrationAccessManagementUi(service).render_html(
RegistrationAccessManagementUi(service).admin_dashboard(
actor,
tenant="tenant:coulomb",
viewport=UiViewport.DESKTOP,
)
)
event_payloads = repr([event.payload for event in service.outbox_events()])
self.assertEqual(projection.values["demo.recovery_hint"], REDACTED)
self.assertNotIn("blue envelope", repr(access_diagnostics))
self.assertNotIn("do-not-render", repr(access_diagnostics))
self.assertNotIn("sample.user@example.test", admin_html)
self.assertNotIn("sample.user@example.test", event_payloads)
self.assertNotIn("blue envelope", event_payloads)
def test_adapter_conformance_harnesses_without_production_infrastructure(self):
store = InMemoryUserEngineStore()
authz = ScenarioAuthorizationHarness(
action_obligations={"access_control_facts.export": ("acl:sync",)}
)
service = UserEngineService(
store=store,
identity_adapter=StrictFixtureIdentityClaimsAdapter(),
authorization=authz,
factor_verifier=_FixtureFactorVerifier(),
)
actor = service.identity_adapter.normalize(human_actor_claims())
_bootstrap_application(service, actor)
registration = service.start_registration(actor, correlation_id="corr-adapter-start")
service.attach_registration_factor(
actor,
registration.registration_id,
{"type": "eid", "value": "EID-123", "secret": "strip-me"},
correlation_id="corr-adapter-factor",
)
service.attach_registration_factor(
actor,
registration.registration_id,
{"type": "email", "value": "sample.user@example.test"},
correlation_id="corr-adapter-email",
)
completed = service.complete_registration(
actor,
registration.registration_id,
correlation_id="corr-adapter-complete",
)
service.add_membership(
actor,
completed.user.user_id,
tenant="tenant:coulomb",
scope_type="group",
scope_id="group:research",
kind="member",
correlation_id="corr-adapter-group",
)
export = service.export_access_control_facts(
actor,
tenant="tenant:coulomb",
user_id=completed.user.user_id,
correlation_id="corr-adapter-export",
)
protocol = service.register_welcome_protocol(
actor,
WelcomeProtocol(
tenant="tenant:coulomb",
name="Adapter Handoff",
trigger_type=OnboardingTriggerType.MANUAL,
steps=(
WelcomeProtocolStep(
step_key="callback",
title="Callback",
subsystem="crm",
requires_subsystem_callback=True,
),
),
),
correlation_id="corr-adapter-protocol",
)
blocked = service.start_onboarding_journey(
actor,
completed.user.user_id,
protocol.protocol_id,
correlation_id="corr-adapter-onboarding",
).journey
resumed = service.resume_onboarding_journey(
actor,
blocked.journey_id,
callback_refs={"callback": "crm://welcome/callback"},
correlation_id="corr-adapter-resume",
)
self.assertIn("group", export.manifest["subject_types"])
self.assertTrue(any(request.action == "access_control_facts.export" for request in authz.requests))
self.assertNotIn("strip-me", repr(store.factors_for_user(completed.user.user_id)))
self.assertEqual(blocked.status, OnboardingJourneyStatus.BLOCKED)
self.assertEqual(resumed.status, OnboardingJourneyStatus.IN_PROGRESS)
self.assertTrue(service.audit_records())
self.assertTrue(service.outbox_events())
self.assertEqual(service.operability_snapshot().issues, ())
with self.assertRaises(ValidationError):
service.identity_adapter.normalize(missing_tenant_claims())
class _FixtureFactorVerifier:
def normalize(self, proofing_result):
factor_type = IdentityFactorType(str(proofing_result["type"]))
return FactorVerification(
factor_type=factor_type,
normalized_value=str(proofing_result["value"]).casefold(),
display_value=None,
source_system="fixture-proofing",
assurance={"level": "ial2" if factor_type == IdentityFactorType.EID else "ial1"},
)
def _service():
store = InMemoryUserEngineStore()
service = UserEngineService(
store=store,
identity_adapter=FixtureIdentityClaimsAdapter(),
authorization=LocalAuthorizationCheckPort(),
)
return service, store, service.authorization
def _actor(subject: str):
claims = human_actor_claims(subject=subject, tenant="tenant:coulomb")
claims["roles"] = ["tenant-admin"]
return FixtureIdentityClaimsAdapter().normalize(claims)
def _bootstrap_application(
service: UserEngineService,
actor,
*,
catalog: Catalog | None = None,
) -> None:
service.register_application(
actor,
sample_application(),
binding=sample_application_binding(),
correlation_id="corr-bootstrap-app",
)
service.publish_catalog(
actor,
catalog or _simple_catalog(),
correlation_id="corr-bootstrap-catalog",
)
def _complete_registration(service: UserEngineService, actor):
session = service.start_registration(actor, correlation_id="corr-reg-start")
service.attach_registration_factor(
actor,
session.registration_id,
FactorVerification(
factor_type=IdentityFactorType.EMAIL,
normalized_value="sample.user@example.test",
display_value="sample.user@example.test",
source_system="fixture-email",
),
correlation_id="corr-reg-factor",
)
return service.complete_registration(
actor,
session.registration_id,
correlation_id="corr-reg-complete",
)
def _email_requirement(email: str = "sample.user@example.test"):
return PreparedFactorRequirement(
factor_type=IdentityFactorType.EMAIL,
normalized_value=email,
)
def _membership_entitlement(scope_id: str = "realm:citadel"):
return PreparedEntitlement(
kind=PreparedEntitlementKind.MEMBERSHIP,
tenant="tenant:coulomb",
scope_type="realm",
scope_id=scope_id,
role="operator",
)
def _operator_access_profile() -> AccessProfile:
return AccessProfile(
tenant="tenant:coulomb",
display_name="Operator",
hat="operator",
scope_type=AccessScopeType.REALM,
scope_id="realm:citadel",
realm_id="realm:citadel",
service_id="app.demo",
membership_requirements=(
AccessMembershipRequirement(
scope_type="realm",
scope_id="realm:citadel",
kind="operator",
),
),
required_factor_types=(IdentityFactorType.EMAIL,),
claims={"service_role": "operator"},
)
def _prepared_welcome_protocol() -> WelcomeProtocol:
return WelcomeProtocol(
tenant="tenant:coulomb",
name="Prepared Welcome",
trigger_type=OnboardingTriggerType.PREPARED_ACCOUNT,
journey_key="welcome-demo",
prepared_journey="welcome-demo",
steps=(
WelcomeProtocolStep(
step_key="intro",
title="Intro",
subsystem="portal",
callback_ref="portal://welcome",
requires_subsystem_callback=True,
),
),
)
def _simple_catalog() -> Catalog:
return Catalog(
catalog_id="demo-profile",
namespace="demo",
version="0.1.0",
owning_application_id="app.demo",
lifecycle=CatalogLifecycle.ACTIVE,
attributes=(
AttributeDefinition(
key="demo.display_density",
value_type="string",
scope=ProfileScope.APPLICATION,
sensitivity=Sensitivity.INTERNAL,
visibility=(Visibility.USER, Visibility.APPLICATION),
mutability=(Mutability.USER,),
default="comfortable",
validation={"enum": ["compact", "comfortable"]},
),
),
)
def _sensitive_catalog() -> Catalog:
catalog = _simple_catalog()
return Catalog(
catalog_id=catalog.catalog_id,
namespace=catalog.namespace,
version=catalog.version,
owning_application_id=catalog.owning_application_id,
lifecycle=catalog.lifecycle,
attributes=(
*catalog.attributes,
AttributeDefinition(
key="demo.recovery_hint",
value_type="string",
scope=ProfileScope.GLOBAL,
sensitivity=Sensitivity.SENSITIVE,
visibility=(Visibility.USER, Visibility.APPLICATION, Visibility.ADMIN),
mutability=(Mutability.USER, Mutability.ADMIN),
),
),
)
if __name__ == "__main__":
unittest.main()