Start user-engine implementation scaffold

This commit is contained in:
2026-05-22 20:55:27 +02:00
parent e618b4e286
commit 58d9de26d3
14 changed files with 763 additions and 7 deletions

View File

@@ -0,0 +1 @@
"""Testing helpers and local fixtures for user-engine."""

View File

@@ -0,0 +1,151 @@
"""Local fixtures used by early user-engine tests and examples."""
from __future__ import annotations
from typing import Iterable, Mapping
from user_engine.domain import (
Actor,
Application,
ApplicationBinding,
AttributeDefinition,
AuthorizationDecision,
AuthorizationEffect,
AuthorizationRequest,
Catalog,
CatalogLifecycle,
Mutability,
PrincipalType,
ProfileScope,
ProjectionType,
Sensitivity,
Visibility,
)
from user_engine.ports import AuthorizationCheckPort, IdentityClaimsAdapter
class FixtureIdentityClaimsAdapter:
"""Normalize dictionary fixtures that already passed token verification."""
def normalize(self, claims: Mapping[str, object]) -> Actor:
scopes = claims.get("scope", ())
if isinstance(scopes, str):
scopes = tuple(part for part in scopes.split(" ") if part)
return Actor(
issuer=str(claims["iss"]),
subject=str(claims["sub"]),
tenant=str(claims["tenant"]),
principal_type=PrincipalType(str(claims["principal_type"])),
audience=tuple(_as_tuple(claims.get("aud", ()))),
roles=tuple(_as_tuple(claims.get("roles", ()))),
groups=tuple(_as_tuple(claims.get("groups", ()))),
scopes=tuple(_as_tuple(scopes)),
assurance=dict(claims.get("assurance", {})),
authorized_party=_optional_str(claims.get("azp") or claims.get("client_id")),
preferred_username=_optional_str(claims.get("preferred_username")),
claims=dict(claims),
agent=dict(claims.get("agent", {})),
)
def identity_key(self, actor: Actor) -> tuple[str, str]:
return actor.identity_key
class StaticAuthorizationCheckPort:
"""Deterministic authorization adapter for tests and local examples."""
def __init__(self, effect: AuthorizationEffect = AuthorizationEffect.ALLOW):
self.effect = effect
self.requests: list[AuthorizationRequest] = []
def check(self, request: AuthorizationRequest) -> AuthorizationDecision:
self.requests.append(request)
return AuthorizationDecision(effect=self.effect, reason="fixture")
def batch_check(
self, requests: Iterable[AuthorizationRequest]
) -> tuple[AuthorizationDecision, ...]:
return tuple(self.check(request) for request in requests)
def human_actor_claims(
*,
issuer: str = "https://issuer.example.test",
subject: str = "user-123",
tenant: str = "tenant:coulomb",
) -> dict[str, object]:
return {
"iss": issuer,
"sub": subject,
"aud": ["user-engine"],
"tenant": tenant,
"principal_type": "human",
"groups": ["tenant:coulomb:users"],
"roles": ["user"],
"scope": "openid profile email",
"preferred_username": "sample.user",
"email": "sample.user@example.test",
"assurance": {
"level": "aal2",
"methods": ["pwd", "otp"],
"mfa": True,
"source": "fixture",
},
}
def sample_application() -> Application:
return Application(
application_id="app.demo",
display_name="Demo Application",
owner="team:demo",
allowed_profile_scopes=(ProfileScope.GLOBAL, ProfileScope.APPLICATION),
allowed_projection_types=(ProjectionType.APPLICATION_RUNTIME,),
)
def sample_application_binding() -> ApplicationBinding:
return ApplicationBinding(
application_id="app.demo",
oidc_client_id="demo-client",
protected_system_id="user-engine.demo",
catalog_namespaces=("demo",),
event_source="user-engine.demo",
deployment_ref="local",
)
def sample_catalog() -> Catalog:
return Catalog(
catalog_id="demo-profile",
namespace="demo",
version="0.1.0",
owning_application_id="app.demo",
lifecycle=CatalogLifecycle.ACTIVE,
attributes=(
AttributeDefinition(
key="demo.display_density",
value_type="string",
scope=ProfileScope.APPLICATION,
sensitivity=Sensitivity.INTERNAL,
visibility=(Visibility.USER, Visibility.APPLICATION),
mutability=(Mutability.USER,),
default="comfortable",
validation={"enum": ["compact", "comfortable"]},
),
),
)
def _as_tuple(value: object) -> tuple[str, ...]:
if value is None:
return ()
if isinstance(value, str):
return (value,)
return tuple(str(item) for item in value)
def _optional_str(value: object) -> str | None:
if value is None:
return None
return str(value)