generated from coulomb/repo-seed
Add integrated user-engine scenarios
This commit is contained in:
158
src/user_engine/testing/scenarios.py
Normal file
158
src/user_engine/testing/scenarios.py
Normal file
@@ -0,0 +1,158 @@
|
||||
"""Reusable scenario fixtures for user-engine conformance tests."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Iterable, Mapping
|
||||
|
||||
from user_engine.domain import (
|
||||
Actor,
|
||||
AuthorizationDecision,
|
||||
AuthorizationEffect,
|
||||
AuthorizationRequest,
|
||||
PrincipalType,
|
||||
)
|
||||
from user_engine.errors import ValidationError
|
||||
from user_engine.testing.fixtures import (
|
||||
FixtureIdentityClaimsAdapter,
|
||||
human_actor_claims,
|
||||
)
|
||||
|
||||
SCENARIO_MATRIX = (
|
||||
"standalone_self_service",
|
||||
"denied_access",
|
||||
"tenant_admin",
|
||||
"platform_operator",
|
||||
"cross_tenant_denial",
|
||||
"two_applications",
|
||||
"sensitive_redaction",
|
||||
"audit_event_replay",
|
||||
)
|
||||
|
||||
|
||||
class StrictFixtureIdentityClaimsAdapter(FixtureIdentityClaimsAdapter):
|
||||
"""Fixture adapter with production-like negative checks."""
|
||||
|
||||
def normalize(self, claims: Mapping[str, object]) -> Actor:
|
||||
for required in ("iss", "sub", "tenant", "principal_type"):
|
||||
if not claims.get(required):
|
||||
raise ValidationError(f"{required} claim is required")
|
||||
issuer = str(claims.get("iss", ""))
|
||||
if issuer.startswith("local:"):
|
||||
raise ValidationError("local issuers are not accepted")
|
||||
if claims.get("expired"):
|
||||
raise ValidationError("expired claims are not accepted")
|
||||
return super().normalize(claims)
|
||||
|
||||
|
||||
class ScenarioAuthorizationHarness:
|
||||
"""Authorization harness for scenario and conformance-style tests."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
default_effect: AuthorizationEffect = AuthorizationEffect.ALLOW,
|
||||
action_effects: dict[str, AuthorizationEffect] | None = None,
|
||||
action_obligations: dict[str, tuple[str, ...]] | None = None,
|
||||
) -> None:
|
||||
self.default_effect = default_effect
|
||||
self.action_effects = action_effects or {}
|
||||
self.action_obligations = action_obligations or {}
|
||||
self.requests: list[AuthorizationRequest] = []
|
||||
|
||||
def check(self, request: AuthorizationRequest) -> AuthorizationDecision:
|
||||
self.requests.append(request)
|
||||
effect = self.action_effects.get(request.action, self.default_effect)
|
||||
if _cross_tenant_denied(request) or _assurance_denied(request):
|
||||
effect = AuthorizationEffect.DENY
|
||||
return AuthorizationDecision(
|
||||
effect=effect,
|
||||
reason="scenario",
|
||||
obligations=self.action_obligations.get(request.action, ()),
|
||||
)
|
||||
|
||||
def batch_check(
|
||||
self, requests: Iterable[AuthorizationRequest]
|
||||
) -> tuple[AuthorizationDecision, ...]:
|
||||
return tuple(self.check(request) for request in requests)
|
||||
|
||||
|
||||
def human_claims() -> dict[str, object]:
|
||||
return human_actor_claims(subject="human-user")
|
||||
|
||||
|
||||
def service_claims() -> dict[str, object]:
|
||||
claims = human_actor_claims(subject="service-client")
|
||||
claims["principal_type"] = PrincipalType.SERVICE.value
|
||||
claims["roles"] = ["service"]
|
||||
claims["azp"] = "service-client"
|
||||
return claims
|
||||
|
||||
|
||||
def agent_claims() -> dict[str, object]:
|
||||
claims = human_actor_claims(subject="agent-worker")
|
||||
claims["principal_type"] = PrincipalType.AGENT.value
|
||||
claims["roles"] = ["agent"]
|
||||
return claims
|
||||
|
||||
|
||||
def delegated_agent_claims() -> dict[str, object]:
|
||||
claims = agent_claims()
|
||||
claims["agent"] = {"delegated_by": "human-user", "purpose": "support"}
|
||||
return claims
|
||||
|
||||
|
||||
def tenant_admin_claims() -> dict[str, object]:
|
||||
claims = human_actor_claims(subject="tenant-admin")
|
||||
claims["roles"] = ["tenant-admin"]
|
||||
return claims
|
||||
|
||||
|
||||
def platform_operator_claims() -> dict[str, object]:
|
||||
claims = human_actor_claims(subject="platform-operator", tenant="platform:root")
|
||||
claims["roles"] = ["platform-operator"]
|
||||
return claims
|
||||
|
||||
|
||||
def break_glass_claims() -> dict[str, object]:
|
||||
claims = platform_operator_claims()
|
||||
claims["roles"] = ["platform-operator", "break-glass"]
|
||||
claims["assurance"] = {"level": "aal3", "methods": ["pwd", "webauthn"], "mfa": True}
|
||||
return claims
|
||||
|
||||
|
||||
def local_issuer_claims() -> dict[str, object]:
|
||||
claims = human_claims()
|
||||
claims["iss"] = "local:fixture"
|
||||
return claims
|
||||
|
||||
|
||||
def invalid_claims() -> dict[str, object]:
|
||||
claims = human_claims()
|
||||
del claims["sub"]
|
||||
return claims
|
||||
|
||||
|
||||
def expired_claims() -> dict[str, object]:
|
||||
claims = human_claims()
|
||||
claims["expired"] = True
|
||||
return claims
|
||||
|
||||
|
||||
def missing_tenant_claims() -> dict[str, object]:
|
||||
claims = human_claims()
|
||||
del claims["tenant"]
|
||||
return claims
|
||||
|
||||
|
||||
def _cross_tenant_denied(request: AuthorizationRequest) -> bool:
|
||||
return (
|
||||
request.tenant != request.actor.tenant
|
||||
and "platform-operator" not in request.actor.roles
|
||||
)
|
||||
|
||||
|
||||
def _assurance_denied(request: AuthorizationRequest) -> bool:
|
||||
required = request.context.get("required_assurance")
|
||||
if required is None:
|
||||
return False
|
||||
return str(request.actor.assurance.get("level", "")) < str(required)
|
||||
Reference in New Issue
Block a user