feat: add durable store conformance harness

This commit is contained in:
2026-06-16 00:20:29 +02:00
parent 2ceecf6463
commit 886874d0f6
10 changed files with 937 additions and 7 deletions

View File

@@ -231,6 +231,21 @@ adapter or provider concerns outside the domain service.
## Migration Contract ## Migration Contract
The isolated store exposes `SCHEMA_VERSION = 0001_initial` and a `migrate` `user_engine.migrations` exposes the ordered durable-store manifest,
hook. Database-backed stores must expose equivalent readiness semantics before `LATEST_SCHEMA_VERSION`, logical record types, and adapter-neutral diagnostic
they are accepted by platform adapters. count keys. The isolated store's `SCHEMA_VERSION` is derived from that manifest
and its `migrate` hook must be idempotent. Database-backed stores must expose
equivalent readiness semantics before they are accepted by platform adapters.
Provider-backed Postgres adapters can use
`migrations/postgres/0001_user_engine_store.sql` as the bootstrap contract or
translate it into their own migration framework while preserving schema-version
tracking, logical record uniqueness, audit durability, and pending-outbox
reads.
Future adapters should run
`user_engine.testing.assert_user_engine_store_conformance(testcase, factory)`
with a factory that returns a fresh store. The harness covers readiness,
idempotent migration, core save/read/query behavior, transaction rollback,
outbox ordering, and diagnostics that expose counts without raw factor or
profile values.

View File

@@ -13,6 +13,7 @@ src/user_engine/
adapters/ local standalone adapters and deterministic test doubles adapters/ local standalone adapters and deterministic test doubles
domain/ transport- and persistence-neutral domain schemas domain/ transport- and persistence-neutral domain schemas
errors.py typed service exceptions for callers and future transports errors.py typed service exceptions for callers and future transports
migrations.py ordered durable-store migration manifest
ports.py adapter protocols for identity, authorization, events, audit, ports.py adapter protocols for identity, authorization, events, audit,
membership export, application bindings, and secrets membership export, application bindings, and secrets
service.py headless service API for the isolated MVP service.py headless service API for the isolated MVP
@@ -50,8 +51,11 @@ The initial headless API is `UserEngineService`. It exposes health,
readiness, `me`, user/account lifecycle, identity linking, application readiness, `me`, user/account lifecycle, identity linking, application
registration, catalog publication, profile writes, effective profile registration, catalog publication, profile writes, effective profile
resolution, projections, audit inspection, and outbox inspection. The first resolution, projections, audit inspection, and outbox inspection. The first
store is `InMemoryUserEngineStore`, which carries an explicit schema version store is `InMemoryUserEngineStore`, which carries a schema version from
and migration hook so later database-backed stores have a contract to match. `user_engine.migrations` and a migration hook so later database-backed stores
have a contract to match. Future store adapters should run
`user_engine.testing.assert_user_engine_store_conformance` with their own
factory before being wired into service tests.
## Tenant Surface ## Tenant Surface

View File

@@ -282,10 +282,16 @@ The first consumer-side follow-up is complete: `UserEngineStore` defines the
adapter boundary and the in-memory store acts as the reference implementation adapter boundary and the in-memory store acts as the reference implementation
for service-level behavior. for service-level behavior.
USER-WP-0016 adds the next consumer-side slice: `user_engine.migrations`
declares the ordered migration manifest and latest schema version,
`migrations/postgres/0001_user_engine_store.sql` defines a provider-facing
bootstrap schema, and `user_engine.testing.store_conformance` exposes a
reusable harness that future adapters can run with their own store factory.
The standard local suite runs that harness against `InMemoryUserEngineStore`.
Likely future follow-up work should be: Likely future follow-up work should be:
- Add a Postgres adapter behind the existing store boundary. - Add a Postgres adapter behind the existing store boundary.
- Add migration files for user-engine tables.
- Add provider-backed conformance tests for locking, uniqueness races, - Add provider-backed conformance tests for locking, uniqueness races,
migration readiness, outbox claiming, redacted diagnostics, and restore migration readiness, outbox claiming, redacted diagnostics, and restore
validation. validation.

View File

@@ -0,0 +1,78 @@
-- USER-WP-0016 durable-store bootstrap for provider-backed Postgres adapters.
-- Provider repositories may apply this file directly or translate it into
-- their migration framework while preserving the table semantics.
CREATE TABLE IF NOT EXISTS user_engine_schema_versions (
version text PRIMARY KEY,
name text NOT NULL,
applied_at timestamptz NOT NULL DEFAULT now(),
checksum text
);
CREATE TABLE IF NOT EXISTS user_engine_records (
record_type text NOT NULL,
record_key text NOT NULL,
tenant text,
user_id text,
application_id text,
scope_type text,
scope_id text,
payload jsonb NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (record_type, record_key)
);
CREATE INDEX IF NOT EXISTS user_engine_records_tenant_idx
ON user_engine_records (tenant, record_type);
CREATE INDEX IF NOT EXISTS user_engine_records_user_idx
ON user_engine_records (user_id, record_type);
CREATE INDEX IF NOT EXISTS user_engine_records_application_idx
ON user_engine_records (application_id, record_type);
CREATE INDEX IF NOT EXISTS user_engine_records_scope_idx
ON user_engine_records (scope_type, scope_id, record_type);
CREATE TABLE IF NOT EXISTS user_engine_audit_records (
audit_id text PRIMARY KEY,
tenant text NOT NULL,
actor_issuer text NOT NULL,
actor_subject text NOT NULL,
action text NOT NULL,
subject text NOT NULL,
correlation_id text NOT NULL,
summary text,
payload jsonb NOT NULL,
recorded_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS user_engine_audit_records_tenant_idx
ON user_engine_audit_records (tenant, recorded_at);
CREATE INDEX IF NOT EXISTS user_engine_audit_records_subject_idx
ON user_engine_audit_records (subject, recorded_at);
CREATE TABLE IF NOT EXISTS user_engine_outbox_events (
event_id text PRIMARY KEY,
tenant text NOT NULL,
event_type text NOT NULL,
aggregate_id text NOT NULL,
correlation_id text NOT NULL,
payload jsonb NOT NULL,
occurred_at timestamptz NOT NULL DEFAULT now(),
claimed_at timestamptz,
claimed_by text,
delivered_at timestamptz,
failed_at timestamptz,
failure_reason text
);
CREATE INDEX IF NOT EXISTS user_engine_outbox_events_pending_idx
ON user_engine_outbox_events (occurred_at)
WHERE claimed_at IS NULL AND delivered_at IS NULL;
INSERT INTO user_engine_schema_versions (version, name)
VALUES ('0001_initial', 'initial durable store')
ON CONFLICT (version) DO NOTHING;

View File

@@ -32,8 +32,9 @@ from user_engine.domain import (
User, User,
WelcomeProtocol, WelcomeProtocol,
) )
from user_engine.migrations import LATEST_SCHEMA_VERSION
SCHEMA_VERSION = "0001_initial" SCHEMA_VERSION = LATEST_SCHEMA_VERSION
@dataclass @dataclass
@@ -332,6 +333,7 @@ class InMemoryUserEngineStore:
"tenant_accounts": len(self.tenant_accounts), "tenant_accounts": len(self.tenant_accounts),
"memberships": len(self.memberships), "memberships": len(self.memberships),
"applications": len(self.applications), "applications": len(self.applications),
"bindings": len(self.bindings),
"catalogs": len(self.catalogs), "catalogs": len(self.catalogs),
"family_invitations": len(self.family_invitations), "family_invitations": len(self.family_invitations),
"registration_sessions": len(self.registration_sessions), "registration_sessions": len(self.registration_sessions),

View File

@@ -0,0 +1,124 @@
"""Migration manifest for durable user-engine store adapters."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import PurePosixPath
@dataclass(frozen=True)
class MigrationStep:
"""One ordered durable-store migration known to user-engine."""
version: str
name: str
description: str
record_types: tuple[str, ...]
sql_path: str | None = None
USER_ENGINE_STORE_RECORD_TYPES = (
"users",
"accounts",
"external_identities",
"tenant_accounts",
"memberships",
"applications",
"application_bindings",
"catalogs",
"family_invitations",
"registration_sessions",
"identity_factors",
"prepared_accounts",
"access_profiles",
"active_access_contexts",
"welcome_protocols",
"onboarding_journeys",
"profile_values",
"audit_records",
"outbox_events",
)
USER_ENGINE_RECORD_COUNT_KEYS = (
"users",
"accounts",
"tenant_accounts",
"memberships",
"applications",
"bindings",
"catalogs",
"family_invitations",
"registration_sessions",
"identity_factors",
"prepared_accounts",
"access_profiles",
"active_access_contexts",
"welcome_protocols",
"onboarding_journeys",
"profile_values",
"audit_records",
"pending_outbox_events",
)
USER_ENGINE_MIGRATIONS = (
MigrationStep(
version="0001_initial",
name="initial durable store",
description=(
"Create the schema-version ledger, logical record table, audit "
"log table, and pending outbox table used by user-engine store "
"adapters."
),
record_types=USER_ENGINE_STORE_RECORD_TYPES,
sql_path="migrations/postgres/0001_user_engine_store.sql",
),
)
LATEST_SCHEMA_VERSION = USER_ENGINE_MIGRATIONS[-1].version
def migration_manifest() -> tuple[MigrationStep, ...]:
"""Return the ordered durable-store migration manifest."""
return USER_ENGINE_MIGRATIONS
def validate_migration_manifest(
migrations: tuple[MigrationStep, ...] = USER_ENGINE_MIGRATIONS,
) -> tuple[str, ...]:
"""Return manifest validation errors without touching provider resources."""
errors: list[str] = []
if not migrations:
return ("migration manifest must not be empty",)
seen_versions: set[str] = set()
previous_version = ""
for migration in migrations:
if not migration.version:
errors.append("migration version must not be empty")
if migration.version in seen_versions:
errors.append(f"duplicate migration version {migration.version}")
if previous_version and migration.version <= previous_version:
errors.append(
f"migration {migration.version} must sort after {previous_version}"
)
if not migration.name:
errors.append(f"migration {migration.version} must have a name")
if not migration.record_types:
errors.append(
f"migration {migration.version} must declare logical record types"
)
if len(set(migration.record_types)) != len(migration.record_types):
errors.append(f"migration {migration.version} has duplicate record types")
if migration.sql_path is not None:
path = PurePosixPath(migration.sql_path)
if path.is_absolute() or ".." in path.parts:
errors.append(
f"migration {migration.version} sql_path must stay repo-relative"
)
if path.suffix != ".sql":
errors.append(
f"migration {migration.version} sql_path must reference SQL"
)
seen_versions.add(migration.version)
previous_version = migration.version
return tuple(errors)

View File

@@ -1 +1,11 @@
"""Testing helpers and local fixtures for user-engine.""" """Testing helpers and local fixtures for user-engine."""
from user_engine.testing.store_conformance import (
assert_user_engine_migration_contract,
assert_user_engine_store_conformance,
)
__all__ = [
"assert_user_engine_migration_contract",
"assert_user_engine_store_conformance",
]

View File

@@ -0,0 +1,522 @@
"""Reusable conformance checks for user-engine store adapters."""
from __future__ import annotations
from collections.abc import Callable
from typing import Any
from unittest import TestCase
from user_engine.domain import (
Account,
AccountStatus,
AccessMembershipRequirement,
AccessProfile,
AccessScopeType,
ActiveAccessContext,
Actor,
Application,
ApplicationBinding,
AttributeDefinition,
AuditRecord,
Catalog,
CatalogLifecycle,
ExternalIdentity,
FamilyInvitation,
IdentityFactor,
IdentityFactorType,
InvitationStatus,
Membership,
Mutability,
OnboardingJourney,
OnboardingStep,
OnboardingTriggerType,
OutboxEvent,
PreparedAccount,
PreparedEntitlement,
PreparedEntitlementKind,
PreparedFactorRequirement,
PrincipalType,
ProfileScope,
ProfileValue,
ProjectionType,
RegistrationSession,
RegistrationStatus,
Sensitivity,
TenantAccount,
User,
Visibility,
WelcomeProtocol,
WelcomeProtocolStep,
)
from user_engine.migrations import (
LATEST_SCHEMA_VERSION,
USER_ENGINE_RECORD_COUNT_KEYS,
migration_manifest,
validate_migration_manifest,
)
from user_engine.ports import UserEngineStore
StoreFactory = Callable[[], UserEngineStore]
TENANT = "tenant:store-conformance"
USER_ID = "usr_store_conformance"
RAW_FACTOR_VALUE = "store.user@example.test"
PROFILE_SECRET_VALUE = "quiet-secret-profile-value"
def assert_user_engine_migration_contract(testcase: TestCase) -> None:
"""Assert the migration manifest is ordered and provider-safe."""
testcase.assertEqual(validate_migration_manifest(), ())
manifest = migration_manifest()
testcase.assertGreaterEqual(len(manifest), 1)
testcase.assertEqual(manifest[-1].version, LATEST_SCHEMA_VERSION)
testcase.assertEqual(
manifest[0].sql_path,
"migrations/postgres/0001_user_engine_store.sql",
)
def assert_user_engine_store_conformance(
testcase: TestCase,
store_factory: StoreFactory,
) -> None:
"""Run the core durable-store behavior contract for one store factory."""
assert_user_engine_migration_contract(testcase)
_assert_readiness_contract(testcase, store_factory)
_assert_save_read_and_query_contract(testcase, store_factory)
_assert_transaction_rollback_contract(testcase, store_factory)
_assert_outbox_ordering_contract(testcase, store_factory)
_assert_diagnostics_contract(testcase, store_factory)
def _assert_readiness_contract(testcase: TestCase, store_factory: StoreFactory) -> None:
store = store_factory()
if store.schema_version is None:
testcase.assertFalse(store.ready)
store.migrate()
testcase.assertTrue(store.ready)
testcase.assertEqual(store.schema_version, LATEST_SCHEMA_VERSION)
store.migrate()
testcase.assertTrue(store.ready)
testcase.assertEqual(store.schema_version, LATEST_SCHEMA_VERSION)
def _assert_save_read_and_query_contract(
testcase: TestCase,
store_factory: StoreFactory,
) -> None:
store = _migrated(store_factory)
records = _write_reference_records(store)
user = records["user"]
account = records["account"]
identity = records["identity"]
tenant_account = records["tenant_account"]
membership = records["membership"]
application = records["application"]
binding = records["binding"]
catalog = records["catalog"]
invitation = records["invitation"]
registration = records["registration"]
factor = records["factor"]
prepared_account = records["prepared_account"]
access_profile = records["access_profile"]
access_context = records["access_context"]
welcome_protocol = records["welcome_protocol"]
onboarding_journey = records["onboarding_journey"]
profile_value = records["profile_value"]
testcase.assertEqual(store.user(USER_ID), user)
testcase.assertEqual(store.user_account(USER_ID), account)
testcase.assertEqual(store.find_identity(identity.issuer, identity.subject), identity)
testcase.assertEqual(store.identities_for_user(USER_ID), (identity,))
testcase.assertEqual(store.tenant_account(TENANT, USER_ID), tenant_account)
testcase.assertEqual(store.memberships_for_user(USER_ID), (membership,))
testcase.assertEqual(store.memberships_for_user(USER_ID, tenant=TENANT), (membership,))
testcase.assertEqual(store.memberships_for_tenant(TENANT), (membership,))
testcase.assertEqual(store.application(application.application_id), application)
testcase.assertEqual(store.binding(binding.application_id), binding)
testcase.assertEqual(store.catalog(catalog.catalog_id), catalog)
testcase.assertEqual(store.all_catalogs(), (catalog,))
testcase.assertEqual(store.family_invitation(invitation.invitation_id), invitation)
testcase.assertEqual(store.family_invitations_for_user(USER_ID), (invitation,))
testcase.assertEqual(store.registration_session(registration.registration_id), registration)
testcase.assertEqual(store.all_registration_sessions(), (registration,))
testcase.assertEqual(store.identity_factor(factor.factor_id), factor)
testcase.assertEqual(store.factors_for_registration(registration.registration_id), (factor,))
testcase.assertEqual(store.factors_for_user(USER_ID), (factor,))
testcase.assertEqual(
store.prepared_account(prepared_account.prepared_account_id),
prepared_account,
)
testcase.assertEqual(store.prepared_accounts_for_tenant(TENANT), (prepared_account,))
testcase.assertEqual(store.access_profile(access_profile.access_profile_id), access_profile)
testcase.assertEqual(store.access_profiles_for_tenant(TENANT), (access_profile,))
testcase.assertEqual(store.active_access_context(USER_ID, TENANT), access_context)
testcase.assertEqual(store.active_access_contexts_for_tenant(TENANT), (access_context,))
testcase.assertEqual(store.welcome_protocol(welcome_protocol.protocol_id), welcome_protocol)
testcase.assertEqual(store.welcome_protocols_for_tenant(TENANT), (welcome_protocol,))
testcase.assertEqual(
store.onboarding_journey(onboarding_journey.journey_id),
onboarding_journey,
)
testcase.assertEqual(store.onboarding_journeys_for_user(USER_ID), (onboarding_journey,))
testcase.assertEqual(
store.onboarding_journeys_for_user(USER_ID, tenant=TENANT),
(onboarding_journey,),
)
testcase.assertEqual(store.onboarding_journeys_for_tenant(TENANT), (onboarding_journey,))
testcase.assertEqual(store.values_for_user(USER_ID), (profile_value,))
replacement = User(
user_id=USER_ID,
display_name="Replacement User",
primary_email=RAW_FACTOR_VALUE,
created_at=user.created_at,
)
store.save_user(replacement)
testcase.assertEqual(store.user(USER_ID), replacement)
def _assert_transaction_rollback_contract(
testcase: TestCase,
store_factory: StoreFactory,
) -> None:
store = _migrated(store_factory)
actor = _actor()
with testcase.assertRaises(RuntimeError):
with store.transaction():
store.save_user(User(user_id="usr_rollback", display_name="Rollback"))
store.append_audit(
AuditRecord(
audit_id="aud_rollback",
actor=actor,
action="store.write",
subject="usr_rollback",
tenant=TENANT,
correlation_id="corr-rollback",
summary="rolled back audit",
)
)
store.append_outbox(
OutboxEvent(
event_id="evt_rollback",
event_type="store.rollback",
aggregate_id="usr_rollback",
payload={"result": "rollback"},
tenant=TENANT,
correlation_id="corr-rollback",
)
)
raise RuntimeError("force rollback")
testcase.assertIsNone(store.user("usr_rollback"))
testcase.assertEqual(store.audit_log(), ())
testcase.assertEqual(store.pending_outbox(), ())
def _assert_outbox_ordering_contract(
testcase: TestCase,
store_factory: StoreFactory,
) -> None:
store = _migrated(store_factory)
events = (
OutboxEvent(
event_id="evt_order_1",
event_type="store.first",
aggregate_id=USER_ID,
payload={"sequence": 1},
tenant=TENANT,
correlation_id="corr-order",
),
OutboxEvent(
event_id="evt_order_2",
event_type="store.second",
aggregate_id=USER_ID,
payload={"sequence": 2},
tenant=TENANT,
correlation_id="corr-order",
),
)
for event in events:
store.append_outbox(event)
testcase.assertEqual(store.pending_outbox(), events)
def _assert_diagnostics_contract(
testcase: TestCase,
store_factory: StoreFactory,
) -> None:
store = _migrated(store_factory)
_write_reference_records(store)
counts = dict(store.record_counts())
testcase.assertEqual(set(counts), set(USER_ENGINE_RECORD_COUNT_KEYS))
testcase.assertTrue(all(isinstance(value, int) for value in counts.values()))
testcase.assertEqual(counts["bindings"], 1)
testcase.assertEqual(counts["pending_outbox_events"], 1)
diagnostics_text = repr(counts)
testcase.assertNotIn(RAW_FACTOR_VALUE, diagnostics_text)
testcase.assertNotIn(PROFILE_SECRET_VALUE, diagnostics_text)
def _migrated(store_factory: StoreFactory) -> UserEngineStore:
store = store_factory()
store.migrate()
return store
def _write_reference_records(store: UserEngineStore) -> dict[str, Any]:
actor = _actor()
user = User(
user_id=USER_ID,
display_name="Store Conformance User",
primary_email=RAW_FACTOR_VALUE,
)
account = Account(
account_id="acct_store_conformance",
user_id=USER_ID,
status=AccountStatus.ACTIVE,
)
identity = ExternalIdentity(
identity_id="eid_store_conformance",
user_id=USER_ID,
issuer="https://issuer.example.test",
subject="store-conformance",
provider="fixture",
)
tenant_account = TenantAccount(user_id=USER_ID, tenant=TENANT)
membership = Membership(
membership_id="mbr_store_conformance",
user_id=USER_ID,
tenant=TENANT,
scope_type="team",
scope_id="team:store-conformance",
kind="member",
)
application = Application(
application_id="app.store-conformance",
display_name="Store Conformance",
owner="team:store-conformance",
allowed_profile_scopes=(ProfileScope.GLOBAL, ProfileScope.APPLICATION),
allowed_projection_types=(ProjectionType.APPLICATION_RUNTIME,),
)
binding = ApplicationBinding(
application_id=application.application_id,
oidc_client_id="store-conformance-client",
protected_system_id="store-conformance.service",
catalog_namespaces=("store",),
event_source="store-conformance",
deployment_ref="local",
)
catalog = Catalog(
catalog_id="cat_store_conformance",
namespace="store",
version="1.0.0",
owning_application_id=application.application_id,
lifecycle=CatalogLifecycle.ACTIVE,
attributes=(
AttributeDefinition(
key="store.secret",
value_type="string",
scope=ProfileScope.GLOBAL,
sensitivity=Sensitivity.SECRET,
visibility=(Visibility.USER,),
mutability=(Mutability.USER,),
),
),
)
invitation = FamilyInvitation(
invitation_id="finv_store_conformance",
tenant=TENANT,
family_scope_id="family:store-conformance",
application_id=application.application_id,
user_id=USER_ID,
primary_email=RAW_FACTOR_VALUE,
role="adult",
status=InvitationStatus.PENDING,
invited_by=actor.subject,
)
registration = RegistrationSession(
tenant=TENANT,
registration_id="reg_store_conformance",
status=RegistrationStatus.FACTOR_VERIFIED,
required_factor_types=(IdentityFactorType.EMAIL,),
verified_factor_ids=("fac_store_conformance",),
user_id=USER_ID,
netkingdom_id="nk-store-conformance",
started_by_subject=actor.subject,
correlation_id="corr-store",
)
factor = IdentityFactor(
factor_id="fac_store_conformance",
factor_type=IdentityFactorType.EMAIL,
normalized_value=RAW_FACTOR_VALUE,
registration_id=registration.registration_id,
user_id=USER_ID,
display_value="s***@example.test",
)
prepared_account = PreparedAccount(
tenant=TENANT,
prepared_account_id="pacct_store_conformance",
required_factor_matches=(
PreparedFactorRequirement(
factor_type=IdentityFactorType.EMAIL,
normalized_value=RAW_FACTOR_VALUE,
),
),
entitlements=(
PreparedEntitlement(
kind=PreparedEntitlementKind.MEMBERSHIP,
tenant=TENANT,
scope_type="team",
scope_id="team:store-conformance",
role="member",
),
),
display_name="Prepared Store User",
primary_email=RAW_FACTOR_VALUE,
prepared_by_subject=actor.subject,
correlation_id="corr-store",
)
access_profile = AccessProfile(
tenant=TENANT,
display_name="Store Member",
hat="member",
access_profile_id="apf_store_conformance",
scope_type=AccessScopeType.TENANT,
scope_id=TENANT,
membership_requirements=(
AccessMembershipRequirement(
scope_type="team",
scope_id="team:store-conformance",
kind="member",
),
),
required_factor_types=(IdentityFactorType.EMAIL,),
profile_defaults={"store.secret": PROFILE_SECRET_VALUE},
claims={"role": "member"},
)
access_context = ActiveAccessContext(
active_context_id="actx_store_conformance",
user_id=USER_ID,
tenant=TENANT,
access_profile_id=access_profile.access_profile_id,
hat=access_profile.hat,
scope_type=AccessScopeType.TENANT,
scope_id=TENANT,
membership_ids=(membership.membership_id,),
factor_ids=(factor.factor_id,),
selected_by_subject=actor.subject,
)
welcome_protocol = WelcomeProtocol(
protocol_id="wpro_store_conformance",
tenant=TENANT,
name="Store Welcome",
trigger_type=OnboardingTriggerType.REGISTRATION,
steps=(
WelcomeProtocolStep(
step_key="profile",
title="Complete profile",
subsystem="user-account",
),
),
)
onboarding_journey = OnboardingJourney(
journey_id="ojrn_store_conformance",
tenant=TENANT,
user_id=USER_ID,
protocol_id=welcome_protocol.protocol_id,
trigger_type=OnboardingTriggerType.REGISTRATION,
steps=(
OnboardingStep(
step_key="profile",
title="Complete profile",
subsystem="user-account",
),
),
source_id=registration.registration_id,
source_event_type="registration.completed",
correlation_id="corr-store",
)
profile_value = ProfileValue(
user_id=USER_ID,
attribute_key="store.secret",
value=PROFILE_SECRET_VALUE,
scope=ProfileScope.GLOBAL,
)
audit_record = AuditRecord(
audit_id="aud_store_conformance",
actor=actor,
action="store.write",
subject=USER_ID,
tenant=TENANT,
correlation_id="corr-store",
summary="store conformance",
)
outbox_event = OutboxEvent(
event_id="evt_store_conformance",
event_type="store.changed",
aggregate_id=USER_ID,
payload={"kind": "store-conformance"},
tenant=TENANT,
correlation_id="corr-store",
)
store.save_user(user)
store.save_account(account)
store.save_identity(identity)
store.save_tenant_account(tenant_account)
store.save_membership(membership)
store.save_application(application)
store.save_binding(binding)
store.save_catalog(catalog)
store.save_family_invitation(invitation)
store.save_registration_session(registration)
store.save_identity_factor(factor)
store.save_prepared_account(prepared_account)
store.save_access_profile(access_profile)
store.save_active_access_context(access_context)
store.save_welcome_protocol(welcome_protocol)
store.save_onboarding_journey(onboarding_journey)
store.save_profile_value(profile_value)
store.append_audit(audit_record)
store.append_outbox(outbox_event)
return {
"user": user,
"account": account,
"identity": identity,
"tenant_account": tenant_account,
"membership": membership,
"application": application,
"binding": binding,
"catalog": catalog,
"invitation": invitation,
"registration": registration,
"factor": factor,
"prepared_account": prepared_account,
"access_profile": access_profile,
"access_context": access_context,
"welcome_protocol": welcome_protocol,
"onboarding_journey": onboarding_journey,
"profile_value": profile_value,
"audit_record": audit_record,
"outbox_event": outbox_event,
}
def _actor() -> Actor:
return Actor(
issuer="https://issuer.example.test",
subject="store-conformance",
tenant=TENANT,
principal_type=PrincipalType.HUMAN,
audience=("user-engine",),
roles=("user",),
scopes=("profile",),
)

View File

@@ -0,0 +1,42 @@
import unittest
from user_engine.adapters.local import InMemoryUserEngineStore, SCHEMA_VERSION
from user_engine.migrations import (
LATEST_SCHEMA_VERSION,
USER_ENGINE_RECORD_COUNT_KEYS,
USER_ENGINE_STORE_RECORD_TYPES,
migration_manifest,
validate_migration_manifest,
)
from user_engine.testing.store_conformance import (
assert_user_engine_migration_contract,
assert_user_engine_store_conformance,
)
class DurableStoreConformanceTests(unittest.TestCase):
def test_migration_manifest_is_ordered_and_provider_safe(self):
assert_user_engine_migration_contract(self)
manifest = migration_manifest()
self.assertEqual(validate_migration_manifest(), ())
self.assertEqual(manifest[-1].version, LATEST_SCHEMA_VERSION)
self.assertIn("users", USER_ENGINE_STORE_RECORD_TYPES)
self.assertIn("pending_outbox_events", USER_ENGINE_RECORD_COUNT_KEYS)
def test_local_schema_version_uses_latest_manifest_version(self):
store = InMemoryUserEngineStore()
store.migrate()
self.assertEqual(SCHEMA_VERSION, LATEST_SCHEMA_VERSION)
self.assertEqual(store.schema_version, LATEST_SCHEMA_VERSION)
self.assertTrue(store.ready)
def test_in_memory_store_satisfies_durable_store_contract(self):
assert_user_engine_store_conformance(self, InMemoryUserEngineStore)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,127 @@
---
id: USER-WP-0016
type: workplan
title: "Durable Store Migration And Conformance Harness"
domain: netkingdom
repo: user-engine
status: finished
owner: codex
topic_slug: netkingdom
planning_priority: medium
planning_order: 16
created: "2026-06-16"
updated: "2026-06-16"
depends_on:
- USER-WP-0009
- USER-WP-0015
state_hub_workstream_id: "91f9494a-7cc8-4e61-bec8-9440dfe0e63d"
---
# USER-WP-0016 - Durable Store Migration And Conformance Harness
## Goal
Turn the durable-store follow-up from USER-WP-0009 into an executable adapter
contract: ordered migrations, a Postgres bootstrap schema, and reusable
conformance checks that future database-backed stores can run against the same
behavior as the in-memory reference adapter.
## Scope Direction
This slice should stay inside user-engine and avoid production infrastructure
dependencies. It should define what a provider-backed adapter must satisfy
without requiring a live Postgres service in the local unit test suite.
## Non-Goals
- Do not add a production Postgres driver or connection lifecycle.
- Do not own platform provisioning, credentials, backups, restores, or
provider observability.
- Do not require Docker, a cloud database, or network access for the standard
test suite.
## Tasks
```task
id: USER-WP-0016-T1
status: done
priority: high
state_hub_task_id: "3783e912-76b5-4458-a8c5-a96bc8870e60"
```
Define an ordered migration manifest with the latest schema version and the
logical store record types covered by user-engine.
```task
id: USER-WP-0016-T2
status: done
priority: high
state_hub_task_id: "7d65e1fa-c7fb-4c83-9512-98296e6ccf4b"
```
Add a Postgres bootstrap SQL file that provider repositories can apply or
translate when implementing the store boundary.
```task
id: USER-WP-0016-T3
status: done
priority: high
state_hub_task_id: "c58db383-bfad-4f52-82d4-9d3c4558b1f2"
```
Add reusable store conformance helpers covering readiness, idempotent
migration, core save/read methods, tenant and user queries, transaction
rollback, outbox ordering, and redacted diagnostics.
```task
id: USER-WP-0016-T4
status: done
priority: medium
state_hub_task_id: "71e51aa8-2a2b-463d-938b-0a1f1261c568"
```
Run the conformance helpers against `InMemoryUserEngineStore` as the reference
implementation.
```task
id: USER-WP-0016-T5
status: done
priority: medium
state_hub_task_id: "e8d7cece-605d-4bb3-b0c6-b03fb908e7c0"
```
Document how future Postgres/provider adapters should consume the manifest,
SQL bootstrap file, and conformance harness.
## Acceptance Criteria
- `LATEST_SCHEMA_VERSION` and the local adapter schema version come from the
same manifest.
- The Postgres bootstrap file contains durable tables for schema versions,
logical records, audit records, and outbox events.
- A future adapter can import one conformance helper and run it with its own
store factory.
- Standard local tests prove the harness against the in-memory store.
- Diagnostics expose counts only and do not leak raw factor or profile values.
## Expected Outputs
- `user_engine.migrations` manifest.
- `migrations/postgres/0001_user_engine_store.sql`.
- `user_engine.testing.store_conformance` helper.
- Store conformance tests.
- Updated durable-store documentation.
## Implementation Notes
Implemented on 2026-06-16:
- Added an ordered migration manifest with logical record and diagnostic count
keys.
- Added a provider-facing Postgres bootstrap SQL file for the generic record,
audit, and outbox storage contract.
- Added reusable store conformance helpers and reference tests for the
in-memory adapter.
- Aligned local schema readiness with `LATEST_SCHEMA_VERSION`.
- Documented the harness in the durable-store consumer requirements and
contracts docs.